Repository: tez Updated Branches: refs/heads/master 7311d7d40 -> 3f4e8a7bf
TEZ-1803. Support > 2gb sort buffer in pipelinedsorter (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3f4e8a7b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3f4e8a7b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3f4e8a7b Branch: refs/heads/master Commit: 3f4e8a7bf2e8615d38770e6b0a2c648a8d078634 Parents: 7311d7d Author: Rajesh Balamohan <[email protected]> Authored: Thu Jan 22 13:23:08 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Thu Jan 22 13:23:08 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/sort/impl/ExternalSorter.java | 22 +- .../common/sort/impl/PipelinedSorter.java | 90 ++++++-- .../common/sort/impl/dflt/DefaultSorter.java | 8 +- .../common/sort/impl/TestPipelinedSorter.java | 209 +++++++++++++++++++ .../sort/impl/dflt/TestDefaultSorter.java | 4 +- .../library/output/TestOnFileSortedOutput.java | 3 + 7 files changed, 304 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3f4e8a7b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b4e529c..cc86772 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.7.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-1803. Support > 2gb sort buffer in pipelinedsorter. TEZ-1826. Add option to disable split grouping and local mode option for tez-examples. TEZ-1982. TezChild setupUgi should not be using environment. TEZ-1980. Suppress tez-dag findbugs warnings until addressed. http://git-wip-us.apache.org/repos/asf/tez/blob/3f4e8a7b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java index 19d60e4..a1da36a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java @@ -101,7 +101,7 @@ public abstract class ExternalSorter { protected final int ifileReadAheadLength; protected final int ifileBufferSize; - protected final int availableMemoryMb; + protected final long availableMemoryMb; protected final IndexedSorter sorter; @@ -149,18 +149,10 @@ public abstract class ExternalSorter { rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw(); + LOG.info("Initial Mem : " + initialMemoryAvailable + ", assignedMb=" + ((initialMemoryAvailable >> 20))); int assignedMb = (int) (initialMemoryAvailable >> 20); - if (assignedMb <= 0) { - if (initialMemoryAvailable > 0) { // Rounded down to 0MB - may be > 0 && < 1MB - this.availableMemoryMb = 1; - LOG.warn("initialAvailableMemory: " + initialMemoryAvailable - + " is too low. Rounding to 1 MB"); - } else { - throw new RuntimeException("InitialMemoryAssigned is <= 0: " + initialMemoryAvailable); - } - } else { - this.availableMemoryMb = assignedMb; - } + //Let the overflow checks happen in appropriate sorter impls + this.availableMemoryMb = assignedMb; // sorter sorter = ReflectionUtils.newInstance(this.conf.getClass( @@ -302,9 +294,9 @@ public abstract class ExternalSorter { conf.getInt( TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB_DEFAULT); - Preconditions.checkArgument(initialMemRequestMb > 0 && initialMemRequestMb <= 2047, - TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB - + " should be larger than 0 and less than or equal to 2047"); + //Higher bound checks are done in individual sorter implementations + Preconditions.checkArgument(initialMemRequestMb > 0, + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " should be larger than 0"); long reqBytes = ((long) initialMemRequestMb) << 20; LOG.info("Requested SortBufferSize (" + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "): " http://git-wip-us.apache.org/repos/asf/tez/blob/3f4e8a7b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 9b171ab..c1a6637 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -25,14 +25,19 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.IntBuffer; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.ListIterator; import java.util.PriorityQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; + +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -82,7 +87,17 @@ public class PipelinedSorter extends ExternalSorter { private final ProxyComparator hasher; // SortSpans private SortSpan span; - private ByteBuffer largeBuffer; + //Maintain a bunch of ByteBuffers (each of them can hold approximately 2 GB data) + @VisibleForTesting + protected final LinkedList<ByteBuffer> bufferList = new LinkedList<ByteBuffer>(); + private ListIterator<ByteBuffer> listIterator; + + //total memory capacity allocated to sorter + private long capacity; + + private static final int BLOCK_SIZE = 1536 << 20; + + // Merger private final SpanMerger merger; private final ExecutorService sortmaster; @@ -96,23 +111,42 @@ public class PipelinedSorter extends ExternalSorter { public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs, long initialMemoryAvailable) throws IOException { + this(outputContext,conf,numOutputs, initialMemoryAvailable, BLOCK_SIZE); + } + + public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs, + long initialMemoryAvailable, int blockSize) throws IOException { super(outputContext, conf, numOutputs, initialMemoryAvailable); partitionBits = bitcount(partitions)+1; //sanity checks - final int sortmb = this.availableMemoryMb; + final long sortmb = this.availableMemoryMb; indexCacheMemoryLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT); // buffers and accounting - int maxMemUsage = sortmb << 20; - maxMemUsage -= maxMemUsage % METASIZE; - largeBuffer = ByteBuffer.allocate(maxMemUsage); - Preconditions.checkArgument(largeBuffer.hasArray(), "Expected array backed byte buffer"); + long maxMemUsage = sortmb << 20; + Preconditions.checkArgument(blockSize > 0 && blockSize < Integer.MAX_VALUE,"Block size should be" + " within 1 - Integer.MAX_VALUE" + blockSize); + long usage = sortmb << 20; + //Divide total memory into different blocks. + int numberOfBlocks = Math.max(1, (int) Math.ceil(1.0 * usage / blockSize)); + LOG.info("Number of Blocks : " + numberOfBlocks + + ", maxMemUsage=" + maxMemUsage + ", BLOCK_SIZE=" + blockSize); + for (int i = 0; i < numberOfBlocks; i++) { + Preconditions.checkArgument(usage > 0, "usage can't be less than zero " + usage); + long size = Math.min(usage, blockSize); + int sizeWithoutMeta = (int) ((size) - (size % METASIZE)); + bufferList.add(ByteBuffer.allocate(sizeWithoutMeta)); + capacity += sizeWithoutMeta; + usage -= size; + } + listIterator = bufferList.listIterator(); + + LOG.info(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " = " + sortmb); - // TODO: configurable setting? - span = new SortSpan(largeBuffer, 1024*1024, 16, comparator); + Preconditions.checkArgument(listIterator.hasNext(), "Buffer list seems to be empty " + bufferList.size()); + span = new SortSpan(listIterator.next(), 1024*1024, 16, this.comparator); merger = new SpanMerger(); // SpanIterators are comparable final int sortThreads = this.conf.getInt( @@ -149,21 +183,28 @@ public class PipelinedSorter extends ExternalSorter { SortSpan newSpan = span.next(); if(newSpan == null) { + Stopwatch stopWatch = new Stopwatch(); + stopWatch.start(); // sort in the same thread, do not wait for the thread pool merger.add(span.sort(sorter)); spill(); + stopWatch.stop(); + LOG.info("Time taken for spill " + (stopWatch.elapsedMillis()) + " ms"); + //safe to reset the iterator + listIterator = bufferList.listIterator(); int items = 1024*1024; int perItem = 16; if(span.length() != 0) { items = span.length(); perItem = span.kvbuffer.limit()/items; - items = (largeBuffer.capacity())/(METASIZE+perItem); + items = (int) ((span.capacity)/(METASIZE+perItem)); if(items > 1024*1024) { // our goal is to have 1M splits and sort early items = 1024*1024; } } - span = new SortSpan(largeBuffer, items, perItem, this.comparator); + Preconditions.checkArgument(listIterator.hasNext(), "block iterator should not be empty"); + span = new SortSpan((ByteBuffer)listIterator.next().clear(), (1024*1024), perItem, this.comparator); } else { // queue up the sort SortTask task = new SortTask(span, sorter); @@ -176,7 +217,7 @@ public class PipelinedSorter extends ExternalSorter { } @Override - public void write(Object key, Object value) + public void write(Object key, Object value) throws IOException { collect( key, value, partitioner.getPartition(key, value, partitions)); @@ -242,7 +283,7 @@ public class PipelinedSorter extends ExternalSorter { public void spill() throws IOException { // create spill file - final long size = largeBuffer.capacity() + final long size = capacity + + (partitions * APPROX_HEADER_LENGTH); final TezSpillRecord spillRec = new TezSpillRecord(partitions); final Path filename = @@ -307,7 +348,8 @@ public class PipelinedSorter extends ExternalSorter { spill(); sortmaster.shutdown(); - largeBuffer = null; + //safe to clean up + bufferList.clear(); numAdditionalSpills.increment(numSpills - 1); @@ -464,9 +506,12 @@ public class PipelinedSorter extends ExternalSorter { private int index = 0; private long eq = 0; + private boolean reinit = false; + private int capacity; + public SortSpan(ByteBuffer source, int maxItems, int perItem, RawComparator comparator) { - int capacity = source.remaining(); + capacity = source.remaining(); int metasize = METASIZE*maxItems; int dataSize = maxItems * perItem; if(capacity < (metasize+dataSize)) { @@ -552,10 +597,18 @@ public class PipelinedSorter extends ExternalSorter { public SortSpan next() { ByteBuffer remaining = end(); if(remaining != null) { + SortSpan newSpan = null; int items = length(); int perItem = kvbuffer.position()/items; - SortSpan newSpan = new SortSpan(remaining, items, perItem, this.comparator); + if (reinit) { //next mem block + //quite possible that the previous span had a length of 1. It is better to reinit here for new span. + items = 1024*1024; + perItem = 16; + } + newSpan = new SortSpan(remaining, items, perItem, this.comparator); newSpan.index = index+1; + LOG.info(String.format("New Span%d.length = %d, perItem = %d", newSpan.index, newSpan + .length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue()); return newSpan; } return null; @@ -578,6 +631,13 @@ public class PipelinedSorter extends ExternalSorter { int perItem = kvbuffer.position()/items; LOG.info(String.format("Span%d.length = %d, perItem = %d", index, length(), perItem)); if(remaining.remaining() < METASIZE+perItem) { + //Check if we can get the next Buffer from the main buffer list + if (listIterator.hasNext()) { + LOG.info("Getting memory from next block in the list, recordsWritten=" + + mapOutputRecordCounter.getValue()); + reinit = true; + return listIterator.next(); + } return null; } return remaining; http://git-wip-us.apache.org/repos/asf/tez/blob/3f4e8a7b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index b99f319..56a3f27 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -123,7 +123,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { final float spillper = this.conf.getFloat( TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT, TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT); - final int sortmb = this.availableMemoryMb; + final int sortmb = (int) availableMemoryMb; + if (sortmb <= 0) { + throw new RuntimeException("InitialMemoryAssigned is <= 0: " + initialMemoryAvailable); + } + Preconditions.checkArgument(sortmb > 0 && sortmb <= 2047, + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + + " for DefaultSorter should be larger than 0 and less than or equal to 2047"); Preconditions.checkArgument(spillper <= (float) 1.0 && spillper > (float) 0.0, TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT + " should be greater than 0 and less than or equal to 1"); http://git-wip-us.apache.org/repos/asf/tez/blob/3f4e8a7b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java new file mode 100644 index 0000000..7ba0bf4 --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -0,0 +1,209 @@ +package org.apache.tez.runtime.library.common.sort.impl; + +import com.google.common.collect.Maps; +import org.apache.commons.math3.random.RandomDataGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.SerializationFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.TezRuntimeFrameworkConfigs; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.partitioner.HashPartitioner; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class TestPipelinedSorter { + private static final Configuration conf = new Configuration(); + private static FileSystem localFs = null; + private static Path workDir = null; + + private int numOutputs; + private long initialAvailableMem; + private OutputContext outputContext; + + //TODO: Need to make it nested structure so that multiple partition cases can be validated + private static TreeMap<String, String> sortedDataMap = Maps.newTreeMap(); + + static { + conf.set("fs.defaultFS", "file:///"); + try { + localFs = FileSystem.getLocal(conf); + workDir = new Path( + new Path(System.getProperty("test.build.data", "/tmp")), + TestPipelinedSorter.class.getName()) + .makeQualified(localFs.getUri(), localFs.getWorkingDirectory()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Before + public void setup() { + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TezCounters counters = new TezCounters(); + String uniqueId = UUID.randomUUID().toString(); + this.outputContext = createMockOutputContext(counters, appId, uniqueId); + + //To enable PipelinedSorter, set 2 threads + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, + HashPartitioner.class.getName()); + + //Setup localdirs + String localDirs = workDir.toString(); + conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs); + } + + @After + public void cleanup() throws IOException { + localFs.delete(workDir, true); + sortedDataMap.clear(); + } + + @Test + public void basicTest() throws IOException { + //TODO: need to support multiple partition testing later + + //# partition, # of keys, size per key, InitialMem, blockSize + basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20); + } + + public void basicTest(int partitions, int numKeys, int keySize, + long initialAvailableMem, int blockSize) throws IOException { + this.numOutputs = partitions; // single output + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, + initialAvailableMem, blockSize); + + //Write 100 keys each of size 10 + writeData(sorter, numKeys, keySize); + + Path outputFile = sorter.finalOutputFile; + FileSystem fs = outputFile.getFileSystem(conf); + + IFile.Reader reader = new IFile.Reader(fs, outputFile, null, null, null, false, -1, 4096); + //Verify dataset + verifyData(reader); + reader.close(); + } + + @Test + //Its not possible to allocate > 2 GB in test environment. Carry out basic checks here. + public void memTest() throws IOException { + //Verify if > 2 GB can be set via config + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3076); + long size = ExternalSorter.getInitialMemoryRequirement(conf, 3076); + Assert.assertTrue(size == (3076l << 20)); + + //Verify BLOCK_SIZEs + this.initialAvailableMem = 10 * 1024 * 1024; + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, + initialAvailableMem, 1 << 20); + Assert.assertTrue(sorter.bufferList.size() == 10); + + sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, + initialAvailableMem, 3 << 20); + Assert.assertTrue(sorter.bufferList.size() == 4); + + sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, + initialAvailableMem, 10 << 20); + Assert.assertTrue(sorter.bufferList.size() == 1); + } + + private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException { + sortedDataMap.clear(); + RandomDataGenerator generator = new RandomDataGenerator(); + for (int i = 0; i < numKeys; i++) { + Text key = new Text(generator.nextHexString(keyLen)); + Text value = new Text(generator.nextHexString(keyLen)); + sorter.write(key, value); + sortedDataMap.put(key.toString(), value.toString()); //for verifying data later + } + sorter.flush(); + sorter.close(); + } + + private void verifyData(IFile.Reader reader) + throws IOException { + Text readKey = new Text(); + Text readValue = new Text(); + DataInputBuffer keyIn = new DataInputBuffer(); + DataInputBuffer valIn = new DataInputBuffer(); + SerializationFactory serializationFactory = new SerializationFactory(conf); + Deserializer<Text> keyDeserializer = serializationFactory.getDeserializer(Text.class); + Deserializer<Text> valDeserializer = serializationFactory.getDeserializer(Text.class); + keyDeserializer.open(keyIn); + valDeserializer.open(valIn); + + int numRecordsRead = 0; + + for (Map.Entry<String, String> entry : sortedDataMap.entrySet()) { + String key = entry.getKey(); + String val = entry.getValue(); + if (reader.nextRawKey(keyIn)) { + reader.nextRawValue(valIn); + readKey = keyDeserializer.deserialize(readKey); + readValue = valDeserializer.deserialize(readValue); + Assert.assertTrue(key.equalsIgnoreCase(readKey.toString())); + Assert.assertTrue(val.equalsIgnoreCase(readValue.toString())); + numRecordsRead++; + } + } + Assert.assertTrue(numRecordsRead == sortedDataMap.size()); + } + + private OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId, + String uniqueId) { + OutputContext outputContext = mock(OutputContext.class); + doReturn(counters).when(outputContext).getCounters(); + doReturn(appId).when(outputContext).getApplicationId(); + doReturn(1).when(outputContext).getDAGAttemptNumber(); + doReturn("dagName").when(outputContext).getDAGName(); + doReturn("destinationVertexName").when(outputContext).getDestinationVertexName(); + doReturn(1).when(outputContext).getOutputIndex(); + doReturn(1).when(outputContext).getTaskAttemptNumber(); + doReturn(1).when(outputContext).getTaskIndex(); + doReturn(1).when(outputContext).getTaskVertexIndex(); + doReturn("vertexName").when(outputContext).getTaskVertexName(); + doReturn(uniqueId).when(outputContext).getUniqueIdentifier(); + Path outDirBase = new Path(workDir, "outDir_" + uniqueId); + String[] outDirs = new String[] { outDirBase.toString() }; + doReturn(outDirs).when(outputContext).getWorkDirs(); + return outputContext; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/3f4e8a7b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index 16dca55..b6e3604 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -75,7 +75,7 @@ public class TestDefaultSorter { conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT, 0.0f); try { - new DefaultSorter(context, conf, 10, 2048); + new DefaultSorter(context, conf, 10, (10 * 1024 * 1024l)); fail(); } catch(IllegalArgumentException e) { assertTrue(e.getMessage().contains(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT)); @@ -83,7 +83,7 @@ public class TestDefaultSorter { conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT, 1.1f); try { - new DefaultSorter(context, conf, 10, 2048); + new DefaultSorter(context, conf, 10, (10 * 1024 * 1024l)); fail(); } catch(IllegalArgumentException e) { assertTrue(e.getMessage().contains(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT)); http://git-wip-us.apache.org/repos/asf/tez/blob/3f4e8a7b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java index 0d1a9c0..b9ff7ef 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java @@ -36,6 +36,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.library.api.KeyValuesWriter; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; +import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter; import org.apache.tez.runtime.library.partitioner.HashPartitioner; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; @@ -164,7 +165,9 @@ public class TestOnFileSortedOutput { doReturn(payLoad).when(context).getUserPayload(); sortedOutput = new OrderedPartitionedKVOutput(context, partitions); try { + //Memory limit checks are done in sorter impls. For e.g, defaultsorter does not support > 2GB sortedOutput.initialize(); + DefaultSorter sorter = new DefaultSorter(context, conf, 100, 3500*1024*1024l); fail(); } catch(IllegalArgumentException e) { assertTrue(e.getMessage().contains(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB));
