Repository: tez Updated Branches: refs/heads/branch-0.7 a135493d4 -> cb52ce62a
TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers (rbalamohan) (cherry picked from commit 2e621ed5067780bb5093851afddfbd571d813fa3) rebased patch for branch-0.7 attached in TEZ-2732. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cb52ce62 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cb52ce62 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cb52ce62 Branch: refs/heads/branch-0.7 Commit: cb52ce62adda0c6ed9fa64acc7750746886059f3 Parents: a135493 Author: Rajesh Balamohan <[email protected]> Authored: Sat Aug 22 03:03:29 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Sat Aug 22 03:20:17 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../common/sort/impl/dflt/DefaultSorter.java | 25 ++++-- .../sort/impl/dflt/TestDefaultSorter.java | 89 +++++++++++++++++++- 3 files changed, 110 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/cb52ce62/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d41e476..10b6169 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers TEZ-2687. ATS History shutdown happens before the min-held containers are released TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters TEZ-2719. Consider reducing logs in unordered fetcher with shared-fetch option @@ -229,6 +230,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers TEZ-2687. ATS History shutdown happens before the min-held containers are released TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters TEZ-2630. TezChild receives IP address instead of FQDN. @@ -438,6 +440,7 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers TEZ-2687. ATS History shutdown happens before the min-held containers are released TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters TEZ-2635. Limit number of attempts being downloaded in unordered fetch. http://git-wip-us.apache.org/repos/asf/tez/blob/cb52ce62/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 2cbb70a..4e83d2d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -122,6 +122,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { private final boolean finalMergeEnabled; private final boolean sendEmptyPartitionDetails; + public static final int MAX_IO_SORT_MB = 1800; + public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs, long initialMemoryAvailable) throws IOException { super(outputContext, conf, numOutputs, initialMemoryAvailable); @@ -211,13 +213,15 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { "=" + availableMemoryMB + ". It should be > 0"); } - if (availableMemoryMB > 2047) { + if (availableMemoryMB > MAX_IO_SORT_MB) { LOG.warn("Scaling down " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + - "=" + availableMemoryMB + " to 2047 (max sort buffer size supported for DefaultSorter)"); + "=" + availableMemoryMB + " to " + MAX_IO_SORT_MB + + " (max sort buffer size supported forDefaultSorter)"); } - //cap sort buffer to 2047 for DefaultSorter. - return Math.min(2047, availableMemoryMB); + // cap sort buffer to MAX_IO_SORT_MB for DefaultSorter. + // Not using 2047 to avoid any ArrayIndexOutofBounds in collect() phase. + return Math.min(MAX_IO_SORT_MB, availableMemoryMB); } @Override @@ -283,6 +287,15 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { // leave at least half the split buffer for serialization data // ensure that kvindex >= bufindex final int distkvi = distanceTo(bufindex, kvbidx); + /** + * Reason for capping sort buffer to MAX_IO_SORT_MB + * E.g + * kvbuffer.length = 2146435072 (2047 MB) + * Corner case: bufIndex=2026133899, kvbidx=523629312. + * distkvi = mod - i + j = 2146435072 - 2026133899 + 523629312 = 643930485 + * newPos = (2026133899 + (max(.., min(643930485/2, 271128624))) (This would + * overflow) + */ final int newPos = (bufindex + Math.max(2 * METASIZE - 1, Math.min(distkvi / 2, @@ -613,7 +626,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } } // here, we know that we have sufficient space to write - if (bufindex + len > bufvoid) { + // int overflow possible with (bufindex + len) + long futureBufIndex = (long) bufindex + len; + if (futureBufIndex > bufvoid) { final int gaplen = bufvoid - bufindex; System.arraycopy(b, off, kvbuffer, bufindex, gaplen); len -= gaplen; http://git-wip-us.apache.org/repos/asf/tez/blob/cb52ce62/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index 70dce13..f6d339a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -31,14 +31,18 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import com.google.protobuf.ByteString; import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.StringInterner; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.TezUtils; import org.apache.tez.common.counters.TaskCounter; @@ -60,6 +64,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; @@ -125,11 +130,91 @@ public class TestDefaultSorter { } } + + @Test + @Ignore + /** + * Disabling this, as this would need 2047 MB sort mb for testing. + * Set DefaultSorter.MAX_IO_SORT_MB = 20467 for running this. + */ + public void testSortLimitsWithSmallRecord() throws IOException { + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, NullWritable.class.getName()); + OutputContext context = createTezOutputContext(); + + doReturn(2800 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask(); + + //Setting IO_SORT_MB to 2047 MB + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 2047); + context.requestInitialMemory( + ExternalSorter.getInitialMemoryRequirement(conf, + context.getTotalMemoryAvailableToTask()), new MemoryUpdateCallbackHandler()); + + DefaultSorter sorter = new DefaultSorter(context, conf, 2, 2047 << 20); + + //Reset key/value in conf back to Text for other test cases + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); + + int i = 0; + /** + * If io.sort.mb is not capped to 1800, this would end up throwing + * "java.lang.ArrayIndexOutOfBoundsException" after many spills. + * Intentionally made it as infinite loop. + */ + while (true) { + //test for the avg record size 2 (in lower spectrum) + Text key = new Text(i + ""); + sorter.write(key, NullWritable.get()); + i = (i + 1) % 10; + } + } + + @Test + @Ignore + /** + * Disabling this, as this would need 2047 MB io.sort.mb for testing. + * Provide > 2GB to JVM when running this test to avoid OOM in string generation. + * + * Set DefaultSorter.MAX_IO_SORT_MB = 2047 for running this. + */ + public void testSortLimitsWithLargeRecords() throws IOException { + OutputContext context = createTezOutputContext(); + + doReturn(2800 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask(); + + //Setting IO_SORT_MB to 2047 MB + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 2047); + context.requestInitialMemory( + ExternalSorter.getInitialMemoryRequirement(conf, + context.getTotalMemoryAvailableToTask()), new MemoryUpdateCallbackHandler()); + + DefaultSorter sorter = new DefaultSorter(context, conf, 2, 2047 << 20); + + int i = 0; + /** + * If io.sort.mb is not capped to 1800, this would end up throwing + * "java.lang.ArrayIndexOutOfBoundsException" after many spills. + * Intentionally made it as infinite loop. + */ + while (true) { + Text key = new Text(i + ""); + //Generate random size between 1 MB to 100 MB. + int valSize = ThreadLocalRandom.current().nextInt(1 * 1024 * 1024, 100 * 1024 * 1024); + String val = StringInterner.weakIntern(StringUtils.repeat("v", valSize)); + sorter.write(key, new Text(val)); + i = (i + 1) % 10; + } + } + + @Test(timeout = 5000) public void testSortMBLimits() throws Exception { - assertTrue("Expected 2047", DefaultSorter.computeSortBufferSize(4096) == 2047); - assertTrue("Expected 2047", DefaultSorter.computeSortBufferSize(2047) == 2047); + assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB, + DefaultSorter.computeSortBufferSize(4096) == DefaultSorter.MAX_IO_SORT_MB); + assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB, + DefaultSorter.computeSortBufferSize(2047) == DefaultSorter.MAX_IO_SORT_MB); assertTrue("Expected 1024", DefaultSorter.computeSortBufferSize(1024) == 1024); try {
