Repository: tez Updated Branches: refs/heads/master 12e1e6673 -> 4c28bdaa5
TEZ-1997. Remove synchronization DefaultSorter::isRLENeeded() (Causes sorter to hang indefinitely in large jobs) (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4c28bdaa Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4c28bdaa Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4c28bdaa Branch: refs/heads/master Commit: 4c28bdaa536f82b21184f26c80e3b108619bf16a Parents: 12e1e66 Author: Rajesh Balamohan <[email protected]> Authored: Tue Jan 27 13:56:21 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Tue Jan 27 13:56:21 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + tez-runtime-library/findbugs-exclude.xml | 6 ++ .../common/sort/impl/ExternalSorter.java | 8 ++- .../common/sort/impl/dflt/DefaultSorter.java | 2 +- .../common/sort/impl/TestPipelinedSorter.java | 3 +- .../sort/impl/dflt/TestDefaultSorter.java | 62 +++++++++++++++++++- .../library/output/TestOnFileSortedOutput.java | 3 + 7 files changed, 78 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4c28bdaa/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7542c13..a3350bb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.7.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-1997. Remove synchronization DefaultSorter::isRLENeeded() (Causes sorter to hang indefinitely in large jobs). TEZ-1996. Update Website after 0.6.0 TEZ-1803. Support > 2gb sort buffer in pipelinedsorter. TEZ-1826. Add option to disable split grouping and local mode option for tez-examples. http://git-wip-us.apache.org/repos/asf/tez/blob/4c28bdaa/tez-runtime-library/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml index 45c194c..aa1c7a2 100644 --- a/tez-runtime-library/findbugs-exclude.xml +++ b/tez-runtime-library/findbugs-exclude.xml @@ -81,6 +81,12 @@ </Match> <Match> + <Class name="org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter"/> + <Field name="totalKeys"/> + <Bug pattern="IS2_INCONSISTENT_SYNC"/> + </Match> + + <Match> <Class name="~org\.apache\.tez\.runtime\.library\.shuffle\.impl\.ShuffleUserPayloads\$.*Proto"/> <Field name="PARSER"/> <Bug pattern="MS_SHOULD_BE_FINAL"/> http://git-wip-us.apache.org/repos/asf/tez/blob/4c28bdaa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java index a1da36a..ca51890 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java @@ -294,10 +294,12 @@ public abstract class ExternalSorter { conf.getInt( TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB_DEFAULT); - //Higher bound checks are done in individual sorter implementations - Preconditions.checkArgument(initialMemRequestMb > 0, - TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " should be larger than 0"); long reqBytes = ((long) initialMemRequestMb) << 20; + //Higher bound checks are done in individual sorter implementations + Preconditions.checkArgument(initialMemRequestMb > 0 && reqBytes < maxAvailableTaskMemory, + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " " + initialMemRequestMb + " should be " + + "larger than 0 and should be less than the available task memory (MB):" + + (maxAvailableTaskMemory >> 20)); LOG.info("Requested SortBufferSize (" + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "): " + initialMemRequestMb); http://git-wip-us.apache.org/repos/asf/tez/blob/4c28bdaa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 56a3f27..f8b038e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -726,7 +726,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { : kvmeta.capacity() + kvstart) / NMETA; } - private synchronized boolean isRLENeeded() { + private boolean isRLENeeded() { return (sameKey > (0.1 * totalKeys)) || (sameKey < 0); } http://git-wip-us.apache.org/repos/asf/tez/blob/4c28bdaa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index 7ba0bf4..4595c18 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -101,6 +101,7 @@ public class TestPipelinedSorter { //TODO: need to support multiple partition testing later //# partition, # of keys, size per key, InitialMem, blockSize + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5); basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20); } @@ -127,7 +128,7 @@ public class TestPipelinedSorter { public void memTest() throws IOException { //Verify if > 2 GB can be set via config conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3076); - long size = ExternalSorter.getInitialMemoryRequirement(conf, 3076); + long size = ExternalSorter.getInitialMemoryRequirement(conf, 4096 * 1024 * 1024l); Assert.assertTrue(size == (3076l << 20)); //Verify BLOCK_SIZEs http://git-wip-us.apache.org/repos/asf/tez/blob/4c28bdaa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index b6e3604..1f5f67c 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -29,7 +29,9 @@ import static org.mockito.Mockito.mock; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.commons.math3.random.RandomDataGenerator; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; @@ -42,7 +44,9 @@ import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter; import org.apache.tez.runtime.library.partitioner.HashPartitioner; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -51,16 +55,23 @@ import org.mockito.stubbing.Answer; public class TestDefaultSorter { private Configuration conf; - private Path workingDir; private static final int PORT = 80; private static final String UniqueID = "UUID"; + private static FileSystem localFs = null; + private static Path workingDir = null; + @Before public void setup() throws IOException { conf = new Configuration(); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 1); // DefaultSorter + conf.set("fs.defaultFS", "file:///"); + localFs = FileSystem.getLocal(conf); - workingDir = new Path(".", this.getClass().getName()); + workingDir = new Path( + new Path(System.getProperty("test.build.data", "/tmp")), + TestDefaultSorter.class.getName()) + .makeQualified(localFs.getUri(), localFs.getWorkingDirectory()); String localDirs = workingDir.toString(); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); @@ -69,6 +80,11 @@ public class TestDefaultSorter { conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs); } + @After + public void cleanup() throws IOException { + localFs.delete(workingDir, true); + } + @Test(timeout = 5000) public void testSortSpillPercent() throws Exception { OutputContext context = createTezOutputContext(); @@ -90,6 +106,48 @@ public class TestDefaultSorter { } } + @Test(timeout = 30000) + //Test TEZ-1977 + public void basicTest() throws IOException { + OutputContext context = createTezOutputContext(); + + MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler(); + try { + //Setting IO_SORT_MB to greater than available mem limit (should throw exception) + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 300); + context.requestInitialMemory( + ExternalSorter.getInitialMemoryRequirement(conf, + context.getTotalMemoryAvailableToTask()), new MemoryUpdateCallbackHandler()); + fail(); + } catch(IllegalArgumentException e) { + assertTrue(e.getMessage().contains(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB)); + } + + conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1); + context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, + context.getTotalMemoryAvailableToTask()), handler); + DefaultSorter sorter = new DefaultSorter(context, conf, 1, handler.getMemoryAssigned()); + + //Write 1000 keys each of size 1000, (> 1 spill should happen) + try { + writeData(sorter, 1000, 1000); + assertTrue(sorter.numSpills > 2); + } catch(IOException ioe) { + fail(ioe.getMessage()); + } + } + + private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException { + RandomDataGenerator generator = new RandomDataGenerator(); + for (int i = 0; i < numKeys; i++) { + Text key = new Text(generator.nextHexString(keyLen)); + Text value = new Text(generator.nextHexString(keyLen)); + sorter.write(key, value); + } + sorter.flush(); + sorter.close(); + } + private OutputContext createTezOutputContext() throws IOException { String[] workingDirs = { workingDir.toString() }; UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf); http://git-wip-us.apache.org/repos/asf/tez/blob/4c28bdaa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java index b9ff7ef..4da62cb 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java @@ -151,6 +151,9 @@ public class TestOnFileSortedOutput { private void startSortedOutput(int partitions) throws Exception { OutputContext context = createTezOutputContext(); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4); + UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf); + doReturn(payLoad).when(context).getUserPayload(); sortedOutput = new OrderedPartitionedKVOutput(context, partitions); sortedOutput.initialize(); sortedOutput.start();
