Repository: tez Updated Branches: refs/heads/master 70096c169 -> 149a04c28
TEZ-2244. PipelinedSorter: Progressive allocation for sort-buffers (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/149a04c2 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/149a04c2 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/149a04c2 Branch: refs/heads/master Commit: 149a04c28d53988fd49510c5453fc19ac0d1030e Parents: 70096c1 Author: Rajesh Balamohan <[email protected]> Authored: Wed Oct 28 13:35:36 2015 -0700 Committer: Rajesh Balamohan <[email protected]> Committed: Wed Oct 28 13:35:36 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../library/api/TezRuntimeConfiguration.java | 28 ++ .../common/sort/impl/PipelinedSorter.java | 187 +++++++---- .../output/OrderedPartitionedKVOutput.java | 2 + .../common/sort/impl/TestPipelinedSorter.java | 328 ++++++++++++++++--- .../TestOrderedPartitionedKVEdgeConfig.java | 9 + 6 files changed, 457 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4f58119..14e2bbe 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.8.2: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2244. PipelinedSorter: Progressive allocation for sort-buffers TEZ-2904. Pig can't specify task specific command opts TEZ-2888. Make critical path calculation resilient to AM crash TEZ-2899. Tez UI: DAG getting created with huge horizontal gap in between vertices http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index cf05546..caad6ef 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -128,6 +128,31 @@ public class TezRuntimeConfiguration { "combine.min.spills"; public static final int TEZ_RUNTIME_COMBINE_MIN_SPILLS_DEFAULT = 3; + /** + * Tries to allocate @link{#TEZ_RUNTIME_IO_SORT_MB} in chunks specified in + * this parameter. + */ + @ConfigurationProperty(type = "integer") + public static final String + TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB = TEZ_RUNTIME_PREFIX + + "pipelined.sorter.min-block.size.in.mb"; + public static final int + TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB_DEFAULT = 2000; + + /** + * Setting this to true would enable sorter + * to auto-allocate memory on need basis in progressive fashion. + * + * Setting to false would allocate all available memory during + * initialization of sorter. In such cases,@link{#TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB} + * would be honored and memory specified in @link{#TEZ_RUNTIME_IO_SORT_MB} + * would be initialized upfront. + */ + @ConfigurationProperty(type = "boolean") + public static final String TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY = TEZ_RUNTIME_PREFIX + + "pipelined.sorter.lazy-allocate.memory"; + public static final boolean + TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY_DEFAULT = false; /** * String value. @@ -498,6 +523,9 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES); tezRuntimeKeys.add(TEZ_RUNTIME_COMBINE_MIN_SPILLS); tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS); + tezRuntimeKeys.add( + TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB); + tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY); tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB); tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES); tezRuntimeKeys.add(TEZ_RUNTIME_PARTITIONER_CLASS); http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 81f5211..f512a5d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -25,9 +25,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.IntBuffer; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; -import java.util.ListIterator; import java.util.PriorityQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -87,10 +85,6 @@ public class PipelinedSorter extends ExternalSorter { private final ProxyComparator hasher; // SortSpans private SortSpan span; - //Maintain a bunch of ByteBuffers (each of them can hold approximately 2 GB data) - @VisibleForTesting - protected final LinkedList<ByteBuffer> bufferList = new LinkedList<ByteBuffer>(); - private ListIterator<ByteBuffer> listIterator; //total memory capacity allocated to sorter private final long capacity; @@ -98,9 +92,6 @@ public class PipelinedSorter extends ExternalSorter { //track buffer overflow recursively in all buffers private int bufferOverflowRecursion; - private final int blockSize; - - // Merger private final SpanMerger merger; private final ExecutorService sortmaster; @@ -110,17 +101,44 @@ public class PipelinedSorter extends ExternalSorter { private final boolean pipelinedShuffle; + private long currentAllocatableMemory; + //Maintain a list of ByteBuffers + @VisibleForTesting + final List<ByteBuffer> buffers; + final int maxNumberOfBlocks; + private int bufferIndex = -1; + private final int MIN_BLOCK_SIZE; + private final boolean lazyAllocateMem; + // TODO Set additional countesr - total bytes written, spills etc. public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs, long initialMemoryAvailable) throws IOException { - this(outputContext,conf,numOutputs, initialMemoryAvailable, 0); - } - - PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs, - long initialMemoryAvailable, int blkSize) throws IOException { super(outputContext, conf, numOutputs, initialMemoryAvailable); + lazyAllocateMem = this.conf.getBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY_DEFAULT); + + if (lazyAllocateMem) { + /** + * When lazy-allocation is enabled, framework takes care of auto + * allocating memory on need basis. Desirable block size is set to 256MB + */ + MIN_BLOCK_SIZE = 256 << 20; //256 MB + } else { + int minBlockSize = conf.getInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, + TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB_DEFAULT); + Preconditions.checkArgument( + (minBlockSize > 0 && minBlockSize < 2047), + TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB + + "=" + minBlockSize + " should be a positive value between 0 and 2047"); + MIN_BLOCK_SIZE = minBlockSize << 20; + } + StringBuilder initialSetupLogLine = new StringBuilder("Setting up PipelinedSorter for ") .append(outputContext.getDestinationVertexName()).append(": "); partitionBits = bitcount(partitions)+1; @@ -135,23 +153,7 @@ public class PipelinedSorter extends ExternalSorter { final long sortmb = this.availableMemoryMb; // buffers and accounting - long maxMemUsage = sortmb << 20; - - this.blockSize = computeBlockSize(blkSize, maxMemUsage); - - long usage = sortmb << 20; - //Divide total memory into different blocks. - int numberOfBlocks = Math.max(1, (int) Math.ceil(1.0 * usage / blockSize)); - initialSetupLogLine.append("#blocks=").append(numberOfBlocks); - initialSetupLogLine.append(", maxMemUsage=").append(maxMemUsage); - initialSetupLogLine.append(", BLOCK_SIZE=").append(blockSize); - initialSetupLogLine.append(", finalMergeEnabled=").append(isFinalMergeEnabled()); - initialSetupLogLine.append(", pipelinedShuffle=").append(pipelinedShuffle); - initialSetupLogLine.append(", sendEmptyPartitions=").append(sendEmptyPartitionDetails); - initialSetupLogLine.append(", ").append(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB).append( - "=").append( - sortmb); - + long maxMemLimit = sortmb << 20; initialSetupLogLine.append(", UsingHashComparator="); // k/v serialization @@ -166,20 +168,43 @@ public class PipelinedSorter extends ExternalSorter { LOG.info(initialSetupLogLine.toString()); long totalCapacityWithoutMeta = 0; - for (int i = 0; i < numberOfBlocks; i++) { - Preconditions.checkArgument(usage > 0, "usage can't be less than zero " + usage); - long size = Math.min(usage, blockSize); + long availableMem = maxMemLimit; + int numBlocks = 0; + while(availableMem > 0) { + long size = Math.min(availableMem, computeBlockSize(availableMem, maxMemLimit)); int sizeWithoutMeta = (int) ((size) - (size % METASIZE)); - bufferList.add(ByteBuffer.allocate(sizeWithoutMeta)); totalCapacityWithoutMeta += sizeWithoutMeta; - usage -= size; + availableMem -= size; + numBlocks++; } + currentAllocatableMemory = maxMemLimit; + maxNumberOfBlocks = numBlocks; capacity = totalCapacityWithoutMeta; - listIterator = bufferList.listIterator(); + buffers = Lists.newArrayListWithCapacity(maxNumberOfBlocks); + allocateSpace(); //Allocate the first block + if (!lazyAllocateMem) { + LOG.info("Pre allocating rest of memory buffers upfront"); + while(allocateSpace() != null); + } - Preconditions.checkArgument(listIterator.hasNext(), "Buffer list seems to be empty " + bufferList.size()); - span = new SortSpan(listIterator.next(), 1024*1024, 16, this.comparator); + initialSetupLogLine.append("#blocks=").append(maxNumberOfBlocks); + initialSetupLogLine.append(", maxMemUsage=").append(maxMemLimit); + initialSetupLogLine.append(", lazyAllocateMem=").append( + lazyAllocateMem); + initialSetupLogLine.append(", minBlockSize=").append(MIN_BLOCK_SIZE); + initialSetupLogLine.append(", initial BLOCK_SIZE=").append(buffers.get(0).capacity()); + initialSetupLogLine.append(", finalMergeEnabled=").append(isFinalMergeEnabled()); + initialSetupLogLine.append(", pipelinedShuffle=").append(pipelinedShuffle); + initialSetupLogLine.append(", sendEmptyPartitions=").append(sendEmptyPartitionDetails); + initialSetupLogLine.append(", ").append(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB).append( + "=").append( + sortmb); + + Preconditions.checkState(buffers.size() > 0, "Atleast one buffer needs to be present"); + LOG.info(initialSetupLogLine.toString()); + + span = new SortSpan(buffers.get(bufferIndex), 1024 * 1024, 16, this.comparator); merger = new SpanMerger(); // SpanIterators are comparable final int sortThreads = this.conf.getInt( @@ -197,18 +222,67 @@ public class PipelinedSorter extends ExternalSorter { minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3); } + ByteBuffer allocateSpace() { + if (currentAllocatableMemory <= 0) { + //No space available. + return null; + } + + int size = computeBlockSize(currentAllocatableMemory, availableMemoryMb << 20); + currentAllocatableMemory -= size; + int sizeWithoutMeta = (size) - (size % METASIZE); + ByteBuffer space = ByteBuffer.allocate(sizeWithoutMeta); + + buffers.add(space); + bufferIndex++; + + Preconditions.checkState(buffers.size() <= maxNumberOfBlocks, + "Number of blocks " + buffers.size() + + " is exceeding " + maxNumberOfBlocks); + + LOG.info("Newly allocated block size=" + size + + ", index=" + bufferIndex + + ", Number of buffers=" + buffers.size() + + ", currentAllocatableMemory=" + currentAllocatableMemory + + ", currentBufferSize=" + space.capacity() + + ", total=" + (availableMemoryMb << 20)); + return space; + } + + @VisibleForTesting - static int computeBlockSize(int blkSize, long maxMemUsage) { - if (blkSize == 0) { - return (int) Math.min(maxMemUsage, Integer.MAX_VALUE); - } else { - Preconditions.checkArgument(blkSize > 0, "blkSize should be between 1 and Integer.MAX_VALUE"); - if (blkSize >= maxMemUsage) { - return (maxMemUsage > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) maxMemUsage; - } else { - return blkSize; + int computeBlockSize(long availableMem, long maxAllocatedMemory) { + int maxBlockSize = 0; + /** + * When lazy-allocation is enabled, framework takes care of auto allocating + * memory on need basis. In such cases, first buffer starts with 32 MB. + */ + if (lazyAllocateMem) { + if (buffers == null || buffers.isEmpty()) { + return 32 << 20; //32 MB + } + } + + //Honor MIN_BLOCK_SIZE + maxBlockSize = Math.max(MIN_BLOCK_SIZE, maxBlockSize); + + if (availableMem < maxBlockSize) { + maxBlockSize = (int) availableMem; + } + + int maxMem = (maxAllocatedMemory > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) maxAllocatedMemory; + if (maxBlockSize > maxMem) { + maxBlockSize = maxMem; + } + + availableMem -= maxBlockSize; + if (availableMem < MIN_BLOCK_SIZE) { + if ((maxBlockSize + availableMem) < Integer.MAX_VALUE) { + //Merge remaining with last block + maxBlockSize += availableMem; } } + return maxBlockSize; } private int bitcount(int n) { @@ -237,8 +311,8 @@ public class PipelinedSorter extends ExternalSorter { if (pipelinedShuffle && ret) { sendPipelinedShuffleEvents(); } - //safe to reset the iterator - listIterator = bufferList.listIterator(); + //safe to reset bufferIndex to 0; + bufferIndex = 0; int items = 1024*1024; int perItem = 16; if(span.length() != 0) { @@ -250,9 +324,9 @@ public class PipelinedSorter extends ExternalSorter { items = 1024*1024; } } - Preconditions.checkArgument(listIterator.hasNext(), "block iterator should not be empty"); + Preconditions.checkArgument(buffers.get(bufferIndex) != null, "block should not be empty"); //TODO: fix per item being passed. - span = new SortSpan((ByteBuffer)listIterator.next().clear(), (1024*1024), + span = new SortSpan((ByteBuffer)buffers.get(bufferIndex).clear(), (1024*1024), perItem, ConfigUtils.getIntermediateOutputKeyComparator(this.conf)); } else { // queue up the sort @@ -325,7 +399,7 @@ public class PipelinedSorter extends ExternalSorter { // restore limit span.kvbuffer.position(keystart); this.sort(); - if (span.length() == 0 || bufferOverflowRecursion > bufferList.size()) { + if (span.length() == 0 || bufferOverflowRecursion > buffers.size()) { // spill the current key value pair spillSingleRecord(key, value, partition); bufferOverflowRecursion = 0; @@ -562,7 +636,7 @@ public class PipelinedSorter extends ExternalSorter { sortmaster.shutdown(); //safe to clean up - bufferList.clear(); + buffers.clear(); if(indexCacheList.isEmpty()) { @@ -911,11 +985,12 @@ public class PipelinedSorter extends ExternalSorter { LOG.info(outputContext.getDestinationVertexName() + ": " + String.format("Span%d.length = %d, perItem = %d", index, length(), perItem)); if(remaining.remaining() < METASIZE+perItem) { //Check if we can get the next Buffer from the main buffer list - if (listIterator.hasNext()) { + ByteBuffer space = allocateSpace(); + if (space != null) { LOG.info(outputContext.getDestinationVertexName() + ": " + "Getting memory from next block in the list, recordsWritten=" + mapOutputRecordCounter.getValue()); reinit = true; - return listIterator.next(); + return space; } return null; } http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 45b6713..c0b0760 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -225,6 +225,8 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS); http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index 92163c4..2cebea4 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -64,7 +64,6 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; * limitations under the License. */ public class TestPipelinedSorter { - private static Configuration conf = new Configuration(); private static FileSystem localFs = null; private static Path workDir = null; private OutputContext outputContext; @@ -76,7 +75,7 @@ public class TestPipelinedSorter { private static TreeMap<String, String> sortedDataMap = Maps.newTreeMap(); static { - conf.set("fs.defaultFS", "file:///"); + Configuration conf = getConf(); try { localFs = FileSystem.getLocal(conf); workDir = new Path( @@ -99,7 +98,11 @@ public class TestPipelinedSorter { TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); this.outputContext = createMockOutputContext(counters, appId, uniqueId); + } + public static Configuration getConf() { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", "file:///"); //To enable PipelinedSorter conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.PIPELINED.name()); @@ -110,15 +113,17 @@ public class TestPipelinedSorter { conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); //Setup localdirs - String localDirs = workDir.toString(); - conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs); + if (workDir != null) { + String localDirs = workDir.toString(); + conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs); + } + return conf; } @After public void reset() throws IOException { cleanup(); localFs.mkdirs(workDir); - conf = new Configuration(); } @Test @@ -126,6 +131,7 @@ public class TestPipelinedSorter { //TODO: need to support multiple partition testing later //# partition, # of keys, size per key, InitialMem, blockSize + Configuration conf = getConf(); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5); basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20); @@ -133,6 +139,7 @@ public class TestPipelinedSorter { @Test public void testWithoutPartitionStats() throws IOException { + Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, false); //# partition, # of keys, size per key, InitialMem, blockSize basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20); @@ -141,6 +148,7 @@ public class TestPipelinedSorter { @Test public void testWithEmptyData() throws IOException { + Configuration conf = getConf(); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5); //# partition, # of keys, size per key, InitialMem, blockSize basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20); @@ -150,9 +158,12 @@ public class TestPipelinedSorter { public void testEmptyDataWithPipelinedShuffle() throws IOException { this.numOutputs = 1; this.initialAvailableMem = 1 *1024 * 1024; + Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, - initialAvailableMem, 1 << 20); + initialAvailableMem); writeData(sorter, 0, 1<<20); @@ -186,16 +197,19 @@ public class TestPipelinedSorter { @Test public void testKVExceedsBuffer2() throws IOException { // a list of 4 blocks each 256kb, 2KV pair, key 1mb, value 1mb - basicTest(1, 2, (1 << 20), (1 * 1024l * 1024l), 256<<10); + basicTest(1, 2, (1 << 20), (1 * 1024l * 1024l), 256<<20); } @Test public void testExceedsKVWithMultiplePartitions() throws IOException { + Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); this.numOutputs = 5; this.initialAvailableMem = 1 * 1024 * 1024; + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, - initialAvailableMem, 0); + initialAvailableMem); writeData(sorter, 100, 1<<20); verifyCounters(sorter, outputContext); @@ -205,9 +219,12 @@ public class TestPipelinedSorter { public void testExceedsKVWithPipelinedShuffle() throws IOException { this.numOutputs = 1; this.initialAvailableMem = 1 *1024 * 1024; + Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, - initialAvailableMem, 1 << 20); + initialAvailableMem); writeData(sorter, 5, 1<<20); @@ -222,9 +239,12 @@ public class TestPipelinedSorter { public void test_TEZ_2602_50mb() throws IOException { this.numOutputs = 1; this.initialAvailableMem = 1 *1024 * 1024; + Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, - initialAvailableMem, 1 << 20); + initialAvailableMem); Text value = new Text("1"); long size = 50 * 1024 * 1024; @@ -238,13 +258,14 @@ public class TestPipelinedSorter { sorter.close(); } - @Test + //@Test public void testLargeDataWithMixedKV() throws IOException { this.numOutputs = 1; this.initialAvailableMem = 48 *1024 * 1024; + Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, - initialAvailableMem, 0); + initialAvailableMem); //write 10 MB KV Text key = new Text(RandomStringUtils.randomAlphanumeric(10 << 20)); @@ -293,13 +314,15 @@ public class TestPipelinedSorter { // 20 KVpairs of 2X10kb, 10 KV of 2X200kb, 20KV of 2X10kb int numkeys[] = {20, 10, 20}; int keylens[] = {10<<10, 200<<10, 10<<10}; - basicTest2(1, numkeys, keylens, (1 * 1024l * 1024l), 1 << 18); + basicTest2(1, numkeys, keylens, (10 * 1024l * 1024l), 2); } @Test public void testWithCustomComparator() throws IOException { //Test with custom comparator - conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, CustomComparator.class.getName()); + Configuration conf = getConf(); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, + CustomComparator.class.getName()); basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20); } @@ -307,10 +330,13 @@ public class TestPipelinedSorter { public void testWithPipelinedShuffle() throws IOException { this.numOutputs = 1; this.initialAvailableMem = 5 *1024 * 1024; + Configuration conf = getConf(); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, - initialAvailableMem, 1 << 20); + initialAvailableMem); //Write 100 keys each of size 10 writeData(sorter, 10000, 100); @@ -323,11 +349,14 @@ public class TestPipelinedSorter { @Test public void testCountersWithMultiplePartitions() throws IOException { + Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); this.numOutputs = 5; this.initialAvailableMem = 5 * 1024 * 1024; + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, - initialAvailableMem, 0); + initialAvailableMem); writeData(sorter, 10000, 100); verifyCounters(sorter, outputContext); @@ -336,8 +365,11 @@ public class TestPipelinedSorter { public void basicTest2(int partitions, int[] numkeys, int[] keysize, long initialAvailableMem, int blockSize) throws IOException { this.numOutputs = partitions; // single output + Configuration conf = getConf(); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 100); PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, - initialAvailableMem, blockSize); + initialAvailableMem); writeData2(sorter, numkeys, keysize); verifyCounters(sorter, outputContext); } @@ -360,10 +392,13 @@ public class TestPipelinedSorter { } public void basicTest(int partitions, int numKeys, int keySize, - long initialAvailableMem, int blockSize) throws IOException { + long initialAvailableMem, int minBlockSize) throws IOException { this.numOutputs = partitions; // single output + Configuration conf = getConf(); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, minBlockSize >> 20); PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, - initialAvailableMem, blockSize); + initialAvailableMem); writeData(sorter, numKeys, keySize); @@ -420,50 +455,250 @@ public class TestPipelinedSorter { @Test + //Intentionally not having timeout //Its not possible to allocate > 2 GB in test environment. Carry out basic checks here. public void memTest() throws IOException { //Verify if > 2 GB can be set via config + Configuration conf = getConf(); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3076); long size = ExternalSorter.getInitialMemoryRequirement(conf, 4096 * 1024 * 1024l); Assert.assertTrue(size == (3076l << 20)); //Verify number of block buffers allocated this.initialAvailableMem = 10 * 1024 * 1024; + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, - initialAvailableMem, 1 << 20); - Assert.assertTrue(sorter.bufferList.size() == 10); + initialAvailableMem); + Assert.assertTrue(sorter.maxNumberOfBlocks == 10); + //10 MB available, request for 3 MB chunk. Last 1 MB gets added to previous chunk. + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3); sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, - initialAvailableMem, 3 << 20); - Assert.assertTrue(sorter.bufferList.size() == 4); + initialAvailableMem); + Assert.assertTrue(sorter.maxNumberOfBlocks == 3); + //10 MB available, request for 10 MB min chunk. Would get 1 block. + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 10); sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, - initialAvailableMem, 10 << 20); - Assert.assertTrue(sorter.bufferList.size() == 1); + initialAvailableMem); + Assert.assertTrue(sorter.maxNumberOfBlocks == 1); + + //Verify block sizes (10 MB min chunk size), but available mem is zero. + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 10); + sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, initialAvailableMem); + Assert.assertTrue(sorter.maxNumberOfBlocks == 1); + int blockSize = sorter.computeBlockSize(0, (10 << 20)); + //available is zero. Can't allocate any more buffer. + Assert.assertTrue(blockSize == 0); + + //300 MB available. Request for 200 MB min block size. It would allocate a block with 200 MB, + // but last 100 would get clubbed. Hence, it would return 300 MB block. + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 200); + sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (300 << 20)); + Assert.assertTrue(sorter.maxNumberOfBlocks == 1); + blockSize = sorter.computeBlockSize((300 << 20), (300 << 20)); + Assert.assertTrue(blockSize == (300 << 20)); + + //300 MB available. Request for 3500 MB min block size. throw exception + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3500); + try { + sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, + (300 << 20)); + } catch(IllegalArgumentException iae ) { + assertTrue(iae.getMessage().contains("positive value between 0 and 2047")); + } - //Verify block sizes - int blockSize = PipelinedSorter.computeBlockSize(0, (10 * 1024 * 1024)); - //initialAvailableMem is < 2 GB. So consider it as the blockSize - Assert.assertTrue(blockSize == (10 * 1024 * 1024)); + //64 MB available. Request for 32 MB min block size. + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 32); + sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (64 << 20)); + Assert.assertTrue(sorter.maxNumberOfBlocks == 2); + blockSize = sorter.computeBlockSize((64 << 20), (64 << 20)); + Assert.assertTrue(blockSize == (32 << 20)); + + blockSize = sorter.computeBlockSize((32 << 20), (64 << 20)); + Assert.assertTrue(blockSize == (32 << 20)); + + blockSize = sorter.computeBlockSize((48 << 20), (64 << 20)); + Assert.assertTrue(blockSize == (48 << 20)); + + //64 MB available. Request for 8 MB min block size. + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 8); + sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (64 << 20)); + Assert.assertTrue(sorter.maxNumberOfBlocks == 8); + blockSize = sorter.computeBlockSize((64 << 20), (64 << 20)); + //Should return 16 instead of 8 which is min block size. + Assert.assertTrue(blockSize == (8 << 20)); + } - blockSize = PipelinedSorter.computeBlockSize(0, (10 * 1024 * 1024 * 1024l)); - //initialAvailableMem is > 2 GB. Restrict block size to Integer.MAX_VALUE; - Assert.assertTrue(blockSize == Integer.MAX_VALUE); + @Test + //Intentionally not having timeout + public void test_without_lazyMemAllocation() throws IOException { + this.numOutputs = 10; + Configuration conf = getConf(); + + //128 MB. Pre-allocate. Request for default block size. Get 1 buffer + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB_DEFAULT); + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false); + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, + numOutputs, (128l << 20)); + assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(), + sorter.buffers.size() == 1); + + //128 MB. Pre-allocate. Get 2 buffer + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 62); + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false); + sorter = new PipelinedSorter(this.outputContext, conf, + numOutputs, (128l << 20)); + assertTrue("Expected 2 sort buffers. current len=" + sorter.buffers.size(), + sorter.buffers.size() == 2); + + //48 MB. Pre-allocate. But request for lesser block size (62). Get 2 buffer + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 48); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 62); + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false); + sorter = new PipelinedSorter(this.outputContext, conf, + numOutputs, (48l << 20)); + assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(), + sorter.buffers.size() == 1); + } + + @Test + //Intentionally not having timeout + public void test_with_lazyMemAllocation() throws IOException { + this.numOutputs = 10; + Configuration conf = getConf(); + + //128 MB. Do not pre-allocate. + // Get 32 MB buffer first and the another buffer with 96 on filling up + // the 32 MB buffer. + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128); + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true); + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, + numOutputs, (128l << 20)); + assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(), + sorter.buffers.size() == 1); + assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024); + writeData(sorter, 100, 1024*1024, false); //100 1 MB KV. Will spill + + //Now it should have created 2 buffers, 32 & 96 MB buffers. + assertTrue(sorter.buffers.size() == 2); + assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024); + assertTrue(sorter.buffers.get(1).capacity() == 96 * 1024 * 1024); + closeSorter(sorter); + verifyCounters(sorter, outputContext); - blockSize = PipelinedSorter.computeBlockSize((1*1024*1024*1024), (10 * 1024 * 1024)); - //sort buffer is 10 MB. But block size requested is 1 GB. Restrict block size to 10 MB. - Assert.assertTrue(blockSize == (10 * 1024 * 1024)); + //TODO: Not sure if this would fail in build machines due to mem + //300 MB. Do not pre-allocate. + // Get 1 buffer with 62 MB. But grow to 2 buffers as data is written + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 300); + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true); + sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (300l << 20)); + assertTrue(sorter.buffers.size() == 1); + assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024); + + writeData(sorter, 50, 1024*1024, false); //50 1 MB KV to allocate 2nd buf + assertTrue(sorter.buffers.size() == 2); + assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024); + assertTrue(sorter.buffers.get(1).capacity() == 268 * 1024 * 1024); + + //48 MB. Do not pre-allocate. + // Get 32 MB buffer first invariably and proceed with the rest. + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 48); + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true); + sorter = new PipelinedSorter(this.outputContext, conf, + numOutputs, (48l << 20)); + assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(), + sorter.buffers.size() == 1); + assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024); + writeData(sorter, 20, 1024*1024, false); //100 1 MB KV. Will spill + + //Now it should have created 2 buffers, 32 & 96 MB buffers. + assertTrue(sorter.buffers.size() == 2); + assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024); + assertTrue(sorter.buffers.get(1).capacity() == 16 * 1024 * 1024); + closeSorter(sorter); + } + @Test + //Intentionally not having timeout + public void testLazyAllocateMem() throws IOException { + this.numOutputs = 10; + Configuration conf = getConf(); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128); + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 4500); try { - blockSize = PipelinedSorter.computeBlockSize(-1, (10 * 1024 * 1024 * 1024l)); - //block size can't set to -1 - fail(); - } catch(IllegalArgumentException e) { - Assert.assertTrue(e.getMessage().contains("should be between 1 and Integer.MAX_VALUE")); + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, + numOutputs, (4500l << 20)); + } catch (IllegalArgumentException iae) { + assertTrue(iae.getMessage().contains(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB)); + assertTrue(iae.getMessage().contains("value between 0 and 2047")); } + + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, -1); + try { + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, + numOutputs, (4500l << 20)); + } catch (IllegalArgumentException iae) { + assertTrue(iae.getMessage().contains(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB)); + assertTrue(iae.getMessage().contains("value between 0 and 2047")); + } + + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, -1); + try { + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, + numOutputs, (4500l << 20)); + } catch (IllegalArgumentException iae) { + assertTrue(iae.getMessage().contains(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB)); + assertTrue(iae.getMessage().contains("value between 0 and 2047")); + } + + } + + @Test + //Intentionally not having timeout + public void testWithLargeKeyValueWithMinBlockSize() throws IOException { + //2 MB key & 2 MB value, 48 MB sort buffer. block size is 16MB + basicTest(1, 5, (2 << 20), (48 * 1024l * 1024l), 16 << 20); } private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException { + writeData(sorter, numKeys, keyLen, true); + } + + private void writeData(ExternalSorter sorter, int numKeys, int keyLen, + boolean autoClose) throws IOException { sortedDataMap.clear(); for (int i = 0; i < numKeys; i++) { Text key = new Text(RandomStringUtils.randomAlphanumeric(keyLen)); @@ -471,8 +706,16 @@ public class TestPipelinedSorter { sorter.write(key, value); sortedDataMap.put(key.toString(), value.toString()); //for verifying data later } - sorter.flush(); - sorter.close(); + if (autoClose) { + closeSorter(sorter); + } + } + + private void closeSorter(ExternalSorter sorter) throws IOException { + if (sorter != null) { + sorter.flush(); + sorter.close(); + } } private void verifyData(IFile.Reader reader) @@ -481,6 +724,7 @@ public class TestPipelinedSorter { Text readValue = new Text(); DataInputBuffer keyIn = new DataInputBuffer(); DataInputBuffer valIn = new DataInputBuffer(); + Configuration conf = getConf(); SerializationFactory serializationFactory = new SerializationFactory(conf); Deserializer<Text> keyDeserializer = serializationFactory.getDeserializer(Text.class); Deserializer<Text> valDeserializer = serializationFactory.getDeserializer(Text.class); http://git-wip-us.apache.org/repos/asf/tez/blob/149a04c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java index f57731c..fabf52d 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java @@ -21,6 +21,7 @@ package org.apache.tez.runtime.library.conf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -122,6 +123,10 @@ public class TestOrderedPartitionedKVEdgeConfig { fromConf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 0.11f); fromConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 123); fromConf.set("io.shouldExist", "io"); + fromConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, + true); + fromConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, + 1000); Map<String, String> additionalConfs = new HashMap<String, String>(); additionalConfs.put("test.key.2", "key2"); additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111"); @@ -151,6 +156,10 @@ public class TestOrderedPartitionedKVEdgeConfig { Configuration outputConf = rebuiltOutput.conf; Configuration inputConf = rebuiltInput.conf; + assertTrue(outputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY_DEFAULT)); + assertEquals(1000, outputConf.getInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 2000)); assertEquals(3, outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 0)); assertEquals(1111, outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0)); assertEquals(2222, outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
