Repository: tez Updated Branches: refs/heads/branch-0.6 ed1631776 -> c27cc583e
TEZ-2290. Scale memory for Default Sorter down to a max of 2047 MB if configured higher (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c27cc583 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c27cc583 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c27cc583 Branch: refs/heads/branch-0.6 Commit: c27cc583ed1783a89b42343ccf58989e136df903 Parents: ed16317 Author: Rajesh Balamohan <[email protected]> Authored: Sat Aug 22 06:22:55 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Sat Aug 22 06:22:55 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../common/sort/impl/ExternalSorter.java | 8 +++++--- .../common/sort/impl/dflt/DefaultSorter.java | 21 +++++++++++++++++++- .../sort/impl/dflt/TestDefaultSorter.java | 20 +++++++++++++++++++ 4 files changed, 47 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c27cc583/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index eefd735..a0dfcdb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2290. Scale memory for Default Sorter down to a max of 2047 MB if configured higher TEZ-2734. Add a test to verify the filename generated by OnDiskMerge TEZ-2687. ATS History shutdown happens before the min-held containers are released TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters @@ -226,6 +227,7 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2290. Scale memory for Default Sorter down to a max of 2047 MB if configured higher TEZ-2734. Add a test to verify the filename generated by OnDiskMerge TEZ-2687. ATS History shutdown happens before the min-held containers are released TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters http://git-wip-us.apache.org/repos/asf/tez/blob/c27cc583/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java index 9aee53a..f653ad5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java @@ -286,10 +286,12 @@ public abstract class ExternalSorter { conf.getInt( TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB_DEFAULT); - Preconditions.checkArgument(initialMemRequestMb > 0 && initialMemRequestMb <= 2047, - TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB - + " should be larger than 0 and less than or equal to 2047"); long reqBytes = ((long) initialMemRequestMb) << 20; + //Higher bound checks are done in individual sorter implementations + Preconditions.checkArgument(initialMemRequestMb > 0 && reqBytes < maxAvailableTaskMemory, + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " " + initialMemRequestMb + " should be " + + "larger than 0 and should be less than the available task memory (MB):" + + (maxAvailableTaskMemory >> 20)); LOG.info("Requested SortBufferSize (" + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "): " + initialMemRequestMb); http://git-wip-us.apache.org/repos/asf/tez/blob/c27cc583/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index f872e1f..c009923 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -123,7 +124,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { final float spillper = this.conf.getFloat( TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT, TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT); - final int sortmb = this.availableMemoryMb; + final int sortmb = computeSortBufferSize((int) availableMemoryMb); Preconditions.checkArgument(spillper <= (float) 1.0 && spillper > (float) 0.0, TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT + " should be greater than 0 and less than or equal to 1"); @@ -179,6 +180,24 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } } + @VisibleForTesting + static int computeSortBufferSize(int availableMemoryMB) { + + if (availableMemoryMB <= 0) { + throw new RuntimeException(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + + "=" + availableMemoryMB + ". It should be > 0"); + } + + if (availableMemoryMB > 2047) { + LOG.warn("Scaling down " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + + "=" + availableMemoryMB + " to 2047 (max sort buffer size supported for DefaultSorter)"); + } + + //cap sort buffer to 2047 for DefaultSorter. + return Math.min(2047, availableMemoryMB); + } + + @Override public void write(Object key, Object value) throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/c27cc583/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index 16dca55..4d4c99a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -90,6 +90,26 @@ public class TestDefaultSorter { } } + @Test(timeout = 5000) + public void testSortMBLimits() throws Exception { + + assertTrue("Expected 2047", DefaultSorter.computeSortBufferSize(4096) == 2047); + assertTrue("Expected 2047", DefaultSorter.computeSortBufferSize(2047) == 2047); + assertTrue("Expected 1024", DefaultSorter.computeSortBufferSize(1024) == 1024); + + try { + DefaultSorter.computeSortBufferSize(0); + fail("Should have thrown error for setting buffer size to 0"); + } catch(RuntimeException re) { + } + + try { + DefaultSorter.computeSortBufferSize(-100); + fail("Should have thrown error for setting buffer size to negative value"); + } catch(RuntimeException re) { + } + } + private OutputContext createTezOutputContext() throws IOException { String[] workingDirs = { workingDir.toString() }; UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
