Repository: tez Updated Branches: refs/heads/branch-0.5 9e234abe3 -> 92c84a381
TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers (rbalamohan) (cherry picked from commit 253e8dee424d400495804794329f3670d8439fe0) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/92c84a38 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/92c84a38 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/92c84a38 Branch: refs/heads/branch-0.5 Commit: 92c84a381a91de19be14db757291a7d89eecdfef Parents: 9e234ab Author: Rajesh Balamohan <[email protected]> Authored: Sat Aug 22 06:36:55 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Sat Aug 22 06:39:44 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/sort/impl/dflt/DefaultSorter.java | 25 ++++-- .../sort/impl/dflt/TestDefaultSorter.java | 91 +++++++++++++++++++- 3 files changed, 110 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/92c84a38/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3a15447..98bbef5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers TEZ-2290. Scale memory for Default Sorter down to a max of 2047 MB if configured higher TEZ-2734. Add a test to verify the filename generated by OnDiskMerge TEZ-2687. ATS History shutdown happens before the min-held containers are released http://git-wip-us.apache.org/repos/asf/tez/blob/92c84a38/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index c009923..69a303c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -117,6 +117,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { private long totalKeys = 0; private long sameKey = 0; + public static final int MAX_IO_SORT_MB = 1800; + public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs, long initialMemoryAvailable) throws IOException { super(outputContext, conf, numOutputs, initialMemoryAvailable); @@ -188,13 +190,15 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { "=" + availableMemoryMB + ". It should be > 0"); } - if (availableMemoryMB > 2047) { + if (availableMemoryMB > MAX_IO_SORT_MB) { LOG.warn("Scaling down " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + - "=" + availableMemoryMB + " to 2047 (max sort buffer size supported for DefaultSorter)"); + "=" + availableMemoryMB + " to " + MAX_IO_SORT_MB + + " (max sort buffer size supported forDefaultSorter)"); } - //cap sort buffer to 2047 for DefaultSorter. - return Math.min(2047, availableMemoryMB); + // cap sort buffer to MAX_IO_SORT_MB for DefaultSorter. + // Not using 2047 to avoid any ArrayIndexOutofBounds in collect() phase. + return Math.min(MAX_IO_SORT_MB, availableMemoryMB); } @@ -261,6 +265,15 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { // leave at least half the split buffer for serialization data // ensure that kvindex >= bufindex final int distkvi = distanceTo(bufindex, kvbidx); + /** + * Reason for capping sort buffer to MAX_IO_SORT_MB + * E.g + * kvbuffer.length = 2146435072 (2047 MB) + * Corner case: bufIndex=2026133899, kvbidx=523629312. + * distkvi = mod - i + j = 2146435072 - 2026133899 + 523629312 = 643930485 + * newPos = (2026133899 + (max(.., min(643930485/2, 271128624))) (This would + * overflow) + */ final int newPos = (bufindex + Math.max(2 * METASIZE - 1, Math.min(distkvi / 2, @@ -591,7 +604,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } } // here, we know that we have sufficient space to write - if (bufindex + len > bufvoid) { + // int overflow possible with (bufindex + len) + long futureBufIndex = (long) bufindex + len; + if (futureBufIndex > bufvoid) { final int gaplen = bufvoid - bufindex; System.arraycopy(b, off, kvbuffer, bufindex, gaplen); len -= gaplen; http://git-wip-us.apache.org/repos/asf/tez/blob/92c84a38/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index 4d4c99a..654cfef 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -29,10 +29,14 @@ import static org.mockito.Mockito.mock; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.math3.random.RandomDataGenerator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.StringInterner; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.TezUtils; import org.apache.tez.common.counters.TezCounters; @@ -42,8 +46,10 @@ import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter; import org.apache.tez.runtime.library.partitioner.HashPartitioner; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -90,11 +96,92 @@ public class TestDefaultSorter { } } + + @Test + @Ignore + /** + * Disabling this, as this would need 2047 MB sort mb for testing. + * Set DefaultSorter.MAX_IO_SORT_MB = 20467 for running this. + */ + public void testSortLimitsWithSmallRecord() throws IOException { + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, NullWritable.class.getName()); + OutputContext context = createTezOutputContext(); + + doReturn(2800 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask(); + + //Setting IO_SORT_MB to 2047 MB + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 2047); + context.requestInitialMemory( + ExternalSorter.getInitialMemoryRequirement(conf, + context.getTotalMemoryAvailableToTask()), new MemoryUpdateCallbackHandler()); + + DefaultSorter sorter = new DefaultSorter(context, conf, 2, 2047 << 20); + + //Reset key/value in conf back to Text for other test cases + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); + + int i = 0; + /** + * If io.sort.mb is not capped to 1800, this would end up throwing + * "java.lang.ArrayIndexOutOfBoundsException" after many spills. + * Intentionally made it as infinite loop. + */ + while (true) { + //test for the avg record size 2 (in lower spectrum) + Text key = new Text(i + ""); + sorter.write(key, NullWritable.get()); + i = (i + 1) % 10; + } + } + + @Test + @Ignore + /** + * Disabling this, as this would need 2047 MB io.sort.mb for testing. + * Provide > 2GB to JVM when running this test to avoid OOM in string generation. + * + * Set DefaultSorter.MAX_IO_SORT_MB = 2047 for running this. + */ + public void testSortLimitsWithLargeRecords() throws IOException { + OutputContext context = createTezOutputContext(); + + doReturn(2800 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask(); + + //Setting IO_SORT_MB to 2047 MB + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 2047); + context.requestInitialMemory( + ExternalSorter.getInitialMemoryRequirement(conf, + context.getTotalMemoryAvailableToTask()), new MemoryUpdateCallbackHandler()); + + DefaultSorter sorter = new DefaultSorter(context, conf, 2, 2047 << 20); + + int i = 0; + /** + * If io.sort.mb is not capped to 1800, this would end up throwing + * "java.lang.ArrayIndexOutOfBoundsException" after many spills. + * Intentionally made it as infinite loop. + */ + RandomDataGenerator rnd = new RandomDataGenerator(); + while (true) { + Text key = new Text(i + ""); + //Generate random size between 1 MB to 100 MB. + int valSize = rnd.nextInt(1 * 1024 * 1024, 100 * 1024 * 1024); + String val = StringInterner.weakIntern(StringUtils.repeat("v", valSize)); + sorter.write(key, new Text(val)); + i = (i + 1) % 10; + } + } + + @Test(timeout = 5000) public void testSortMBLimits() throws Exception { - assertTrue("Expected 2047", DefaultSorter.computeSortBufferSize(4096) == 2047); - assertTrue("Expected 2047", DefaultSorter.computeSortBufferSize(2047) == 2047); + assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB, + DefaultSorter.computeSortBufferSize(4096) == DefaultSorter.MAX_IO_SORT_MB); + assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB, + DefaultSorter.computeSortBufferSize(2047) == DefaultSorter.MAX_IO_SORT_MB); assertTrue("Expected 1024", DefaultSorter.computeSortBufferSize(1024) == 1024); try {
