Repository: tez
Updated Branches:
  refs/heads/master 12e1e6673 -> 4c28bdaa5


TEZ-1997. Remove synchronization DefaultSorter::isRLENeeded() (Causes sorter to 
hang indefinitely in large jobs) (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4c28bdaa
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4c28bdaa
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4c28bdaa

Branch: refs/heads/master
Commit: 4c28bdaa536f82b21184f26c80e3b108619bf16a
Parents: 12e1e66
Author: Rajesh Balamohan <[email protected]>
Authored: Tue Jan 27 13:56:21 2015 +0530
Committer: Rajesh Balamohan <[email protected]>
Committed: Tue Jan 27 13:56:21 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 tez-runtime-library/findbugs-exclude.xml        |  6 ++
 .../common/sort/impl/ExternalSorter.java        |  8 ++-
 .../common/sort/impl/dflt/DefaultSorter.java    |  2 +-
 .../common/sort/impl/TestPipelinedSorter.java   |  3 +-
 .../sort/impl/dflt/TestDefaultSorter.java       | 62 +++++++++++++++++++-
 .../library/output/TestOnFileSortedOutput.java  |  3 +
 7 files changed, 78 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4c28bdaa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7542c13..a3350bb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1997. Remove synchronization DefaultSorter::isRLENeeded() (Causes sorter 
to hang indefinitely in large jobs).
   TEZ-1996. Update Website after 0.6.0
   TEZ-1803. Support > 2gb sort buffer in pipelinedsorter.
   TEZ-1826. Add option to disable split grouping and local mode option for 
tez-examples.

http://git-wip-us.apache.org/repos/asf/tez/blob/4c28bdaa/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml 
b/tez-runtime-library/findbugs-exclude.xml
index 45c194c..aa1c7a2 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -81,6 +81,12 @@
   </Match>
 
   <Match>
+    <Class 
name="org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter"/>
+    <Field name="totalKeys"/>
+    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+  </Match>
+
+  <Match>
     <Class 
name="~org\.apache\.tez\.runtime\.library\.shuffle\.impl\.ShuffleUserPayloads\$.*Proto"/>
     <Field name="PARSER"/>
     <Bug pattern="MS_SHOULD_BE_FINAL"/>

http://git-wip-us.apache.org/repos/asf/tez/blob/4c28bdaa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index a1da36a..ca51890 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -294,10 +294,12 @@ public abstract class ExternalSorter {
         conf.getInt(
             TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 
             TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB_DEFAULT);
-    //Higher bound checks are done in individual sorter implementations
-    Preconditions.checkArgument(initialMemRequestMb > 0,
-        TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " should be larger 
than 0");
     long reqBytes = ((long) initialMemRequestMb) << 20;
+    //Higher bound checks are done in individual sorter implementations
+    Preconditions.checkArgument(initialMemRequestMb > 0 && reqBytes < 
maxAvailableTaskMemory,
+        TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " " + 
initialMemRequestMb + " should be "
+            + "larger than 0 and should be less than the available task memory 
(MB):" +
+            (maxAvailableTaskMemory >> 20));
     LOG.info("Requested SortBufferSize ("
         + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "): "
         + initialMemRequestMb);

http://git-wip-us.apache.org/repos/asf/tez/blob/4c28bdaa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 56a3f27..f8b038e 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -726,7 +726,7 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
         : kvmeta.capacity() + kvstart) / NMETA;
   }
 
-  private synchronized boolean isRLENeeded() {
+  private boolean isRLENeeded() {
     return (sameKey > (0.1 * totalKeys)) || (sameKey < 0);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4c28bdaa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 7ba0bf4..4595c18 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -101,6 +101,7 @@ public class TestPipelinedSorter {
     //TODO: need to support multiple partition testing later
 
     //# partition, # of keys, size per key, InitialMem, blockSize
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
     basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20);
   }
 
@@ -127,7 +128,7 @@ public class TestPipelinedSorter {
   public void memTest() throws IOException {
     //Verify if > 2 GB can be set via config
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3076);
-    long size = ExternalSorter.getInitialMemoryRequirement(conf, 3076);
+    long size = ExternalSorter.getInitialMemoryRequirement(conf, 4096 * 1024 * 
1024l);
     Assert.assertTrue(size == (3076l << 20));
 
     //Verify BLOCK_SIZEs

http://git-wip-us.apache.org/repos/asf/tez/blob/4c28bdaa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index b6e3604..1f5f67c 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -29,7 +29,9 @@ import static org.mockito.Mockito.mock;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.commons.math3.random.RandomDataGenerator;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
@@ -42,7 +44,9 @@ import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -51,16 +55,23 @@ import org.mockito.stubbing.Answer;
 public class TestDefaultSorter {
 
   private Configuration conf;
-  private Path workingDir;
   private static final int PORT = 80;
   private static final String UniqueID = "UUID";
 
+  private static FileSystem localFs = null;
+  private static Path workingDir = null;
+
   @Before
   public void setup() throws IOException {
     conf = new Configuration();
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 1); // 
DefaultSorter
+    conf.set("fs.defaultFS", "file:///");
+    localFs = FileSystem.getLocal(conf);
 
-    workingDir = new Path(".", this.getClass().getName());
+    workingDir = new Path(
+        new Path(System.getProperty("test.build.data", "/tmp")),
+        TestDefaultSorter.class.getName())
+        .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
     String localDirs = workingDir.toString();
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, 
Text.class.getName());
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, 
Text.class.getName());
@@ -69,6 +80,11 @@ public class TestDefaultSorter {
     conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
   }
 
+  @After
+  public void cleanup() throws IOException {
+    localFs.delete(workingDir, true);
+  }
+
   @Test(timeout = 5000)
   public void testSortSpillPercent() throws Exception {
     OutputContext context = createTezOutputContext();
@@ -90,6 +106,48 @@ public class TestDefaultSorter {
     }
   }
 
+  @Test(timeout = 30000)
+  //Test TEZ-1977
+  public void basicTest() throws IOException {
+    OutputContext context = createTezOutputContext();
+
+    MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
+    try {
+      //Setting IO_SORT_MB to greater than available mem limit (should throw 
exception)
+      conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 300);
+      context.requestInitialMemory(
+          ExternalSorter.getInitialMemoryRequirement(conf,
+              context.getTotalMemoryAvailableToTask()), new 
MemoryUpdateCallbackHandler());
+      fail();
+    } catch(IllegalArgumentException e) {
+      
assertTrue(e.getMessage().contains(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB));
+    }
+
+    conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1);
+    
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
+            context.getTotalMemoryAvailableToTask()), handler);
+    DefaultSorter sorter = new DefaultSorter(context, conf, 1, 
handler.getMemoryAssigned());
+
+    //Write 1000 keys each of size 1000, (> 1 spill should happen)
+    try {
+      writeData(sorter, 1000, 1000);
+      assertTrue(sorter.numSpills > 2);
+    } catch(IOException ioe) {
+      fail(ioe.getMessage());
+    }
+  }
+
+  private void writeData(ExternalSorter sorter, int numKeys, int keyLen) 
throws IOException {
+    RandomDataGenerator generator = new RandomDataGenerator();
+    for (int i = 0; i < numKeys; i++) {
+      Text key = new Text(generator.nextHexString(keyLen));
+      Text value = new Text(generator.nextHexString(keyLen));
+      sorter.write(key, value);
+    }
+    sorter.flush();
+    sorter.close();
+  }
+
   private OutputContext createTezOutputContext() throws IOException {
     String[] workingDirs = { workingDir.toString() };
     UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);

http://git-wip-us.apache.org/repos/asf/tez/blob/4c28bdaa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index b9ff7ef..4da62cb 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -151,6 +151,9 @@ public class TestOnFileSortedOutput {
 
   private void startSortedOutput(int partitions) throws Exception {
     OutputContext context = createTezOutputContext();
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
+    UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
+    doReturn(payLoad).when(context).getUserPayload();
     sortedOutput = new OrderedPartitionedKVOutput(context, partitions);
     sortedOutput.initialize();
     sortedOutput.start();

Reply via email to