Repository: tez
Updated Branches:
  refs/heads/branch-0.9 fa6bc2acf -> 5bf3daddd


TEZ-3924. TestDefaultSorter fails intermittently due random keys and 
interaction with RLE and partition collisions (Jonathan Eagles via kshukla)

(cherry picked from commit cf6ea5f62aa4a5957028e6c074a62747324abaa1)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5bf3dadd
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5bf3dadd
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5bf3dadd

Branch: refs/heads/branch-0.9
Commit: 5bf3daddd16faf4fab0f97c76ffc13b8bcb23491
Parents: fa6bc2a
Author: Kuhu Shukla <[email protected]>
Authored: Fri May 4 15:59:47 2018 -0500
Committer: Kuhu Shukla <[email protected]>
Committed: Fri May 4 15:59:47 2018 -0500

----------------------------------------------------------------------
 .../sort/impl/dflt/TestDefaultSorter.java       | 191 ++++++++++++++-----
 1 file changed, 140 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5bf3dadd/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index aad232a..e0fb153 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -19,7 +19,9 @@
 package org.apache.tez.runtime.library.common.sort.impl.dflt;
 
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tez.runtime.library.api.Partitioner;
 import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.junit.Assert;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -269,11 +271,17 @@ public class TestDefaultSorter {
     conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1);
     
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
         context.getTotalMemoryAvailableToTask()), handler);
-    DefaultSorter sorter = new DefaultSorter(context, conf, 5, 
handler.getMemoryAssigned());
+    SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 5, 
handler.getMemoryAssigned());
+    DefaultSorter sorter = sorterWrapper.getSorter();
 
     //Write 1000 keys each of size 1000, (> 1 spill should happen)
     try {
-      writeData(sorter, 1000, 1000);
+      Text[] keys = generateData(1000, 1000);
+      Text[] values = generateData(1000, 1000);
+      for (int i = 0; i < keys.length; i++) {
+        sorterWrapper.writeKeyValue(keys[i], values[i]);
+      }
+      sorterWrapper.close();
       assertTrue(sorter.getNumSpills() > 2);
       verifyCounters(sorter, context);
     } catch(IOException ioe) {
@@ -285,11 +293,13 @@ public class TestDefaultSorter {
 
   @Test(timeout = 30000)
   public void testEmptyCaseFileLengths() throws IOException {
-    testEmptyCaseFileLengthsHelper(50, 2, 1, 48);
-    testEmptyCaseFileLengthsHelper(1, 1, 10, 0);
+    testEmptyCaseFileLengthsHelper(50, new String[] {"a", "b"}, new String[] 
{"1", "2"});
+    testEmptyCaseFileLengthsHelper(50, new String[] {"a", "a"}, new String[] 
{"1", "2"});
+    testEmptyCaseFileLengthsHelper(50, new String[] {"aaa", "bbb", "aaa"}, new 
String[] {"1", "2", "3"});
+    testEmptyCaseFileLengthsHelper(1, new String[] {"abcdefghij"}, new 
String[] {"1234567890"});
   }
 
-  public void testEmptyCaseFileLengthsHelper(int numPartitions, int numKeys, 
int keyLen, int expectedEmptyPartitions)
+  public void testEmptyCaseFileLengthsHelper(int numPartitions, String[] keys, 
String[] values)
       throws IOException {
     OutputContext context = createTezOutputContext();
 
@@ -298,39 +308,49 @@ public class TestDefaultSorter {
     
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
         context.getTotalMemoryAvailableToTask()), handler);
     String auxService = 
conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, 
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
-    DefaultSorter sorter = new DefaultSorter(context, conf, numPartitions, 
handler.getMemoryAssigned());
-    try {
-      writeData(sorter, numKeys, keyLen);
-      List<Event> events = new ArrayList<Event>();
-      String pathComponent = (context.getUniqueIdentifier() + "_" + 0);
-      ShuffleUtils.generateEventOnSpill(events, true, true, context, 0,
-          sorter.indexCacheList.get(0), 0, true, pathComponent, 
sorter.getPartitionStats(),
-          sorter.reportDetailedPartitionStats(), auxService, 
TezCommonUtils.newBestCompressionDeflater());
-
-      CompositeDataMovementEvent compositeDataMovementEvent =
-          (CompositeDataMovementEvent) events.get(1);
-      ByteBuffer bb = compositeDataMovementEvent.getUserPayload();
-      ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
-          
ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
-
-      if (shufflePayload.hasEmptyPartitions()) {
-        byte[] emptyPartitionsBytesString =
-            TezCommonUtils.decompressByteStringToByteArray(
-                shufflePayload.getEmptyPartitions());
-        BitSet emptyPartitionBitSet = 
TezUtilsInternal.fromByteArray(emptyPartitionsBytesString);
-        Assert.assertTrue("Number of empty partitions did not match!",
-            emptyPartitionBitSet.cardinality() == expectedEmptyPartitions);
-      } else {
-        Assert.assertTrue(expectedEmptyPartitions == 0);
-      }
-      //4 bytes of header + numKeys* 2 *(keydata.length + keyLength.length) + 
2 * 1 byte of EOF_MARKER + 4 bytes of checksum
-      assertEquals("Unexpected Output File Size!",
-          localFs.getFileStatus(sorter.getFinalOutputFile()).getLen(), numKeys 
* (4 + (2 * (2 + keyLen)) + 2 + 4));
-      assertTrue(sorter.getNumSpills()  == 1);
-      verifyCounters(sorter, context);
-    } catch(IOException ioe) {
-      fail(ioe.getMessage());
+    SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 
numPartitions, handler.getMemoryAssigned());
+    DefaultSorter sorter = sorterWrapper.getSorter();
+    assertEquals("Key and Values must have the same number of elements", 
keys.length, values.length);
+    BitSet keyRLEs = new BitSet(keys.length);
+    for (int i = 0; i < keys.length; i++) {
+      boolean isRLE = sorterWrapper.writeKeyValue(new Text(keys[i]), new 
Text(values[i]));
+      keyRLEs.set(i, isRLE);
+    }
+    sorterWrapper.close();
+
+    List<Event> events = new ArrayList<>();
+    String pathComponent = (context.getUniqueIdentifier() + "_" + 0);
+    ShuffleUtils.generateEventOnSpill(events, true, true, context, 0,
+        sorter.indexCacheList.get(0), 0, true, pathComponent, 
sorter.getPartitionStats(),
+        sorter.reportDetailedPartitionStats(), auxService, 
TezCommonUtils.newBestCompressionDeflater());
+
+    CompositeDataMovementEvent compositeDataMovementEvent =
+        (CompositeDataMovementEvent) events.get(1);
+    ByteBuffer bb = compositeDataMovementEvent.getUserPayload();
+    ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+        
ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+
+    if (shufflePayload.hasEmptyPartitions()) {
+      byte[] emptyPartitionsBytesString =
+          TezCommonUtils.decompressByteStringToByteArray(
+              shufflePayload.getEmptyPartitions());
+      BitSet emptyPartitionBitSet = 
TezUtilsInternal.fromByteArray(emptyPartitionsBytesString);
+      Assert.assertEquals("Number of empty partitions did not match!",
+          emptyPartitionBitSet.cardinality(), 
sorterWrapper.getEmptyPartitionsCount());
+    } else {
+      Assert.assertEquals(sorterWrapper.getEmptyPartitionsCount(), 0);
+    }
+    // Each non-empty partition adds 4 bytes for header, 2 bytes for 
EOF_MARKER, 4 bytes for checksum
+    int expectedFileOutLength = sorterWrapper.getNonEmptyPartitionsCount() * 
10;
+    for (int i = 0; i < keys.length; i++) {
+      // Each Record adds 1 byte for key length, 1 byte Text overhead 
(length), key.length bytes for key
+      expectedFileOutLength += keys[i].length() + 2;
+      // Each Record adds 1 byte for value length, 1 byte Text overhead 
(length), value.length bytes for value
+      expectedFileOutLength += values[i].length() + 2;
     }
+    assertEquals("Unexpected Output File Size!", 
localFs.getFileStatus(sorter.getFinalOutputFile()).getLen(), 
expectedFileOutLength);
+    assertEquals(sorter.getNumSpills(), 1);
+    verifyCounters(sorter, context);
   }
 
   @Test
@@ -396,9 +416,15 @@ public class TestDefaultSorter {
     
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
         context.getTotalMemoryAvailableToTask()), handler);
     int partitions = 50;
-    DefaultSorter sorter = new DefaultSorter(context, conf, partitions, 
handler.getMemoryAssigned());
+    SorterWrapper sorterWrapper = new SorterWrapper(context, conf, partitions, 
handler.getMemoryAssigned());
+    DefaultSorter sorter = sorterWrapper.getSorter();
 
-    writeData(sorter, numKeys, 1000000);
+    Text[] keys = generateData(numKeys, 1000000);
+    Text[] values = generateData(numKeys, 1000000);
+    for (int i = 0; i < keys.length; i++) {
+      sorterWrapper.writeKeyValue(keys[i], values[i]);
+    }
+    sorterWrapper.close();
     if (numKeys == 0) {
       assertTrue(sorter.getNumSpills() == 1);
     } else {
@@ -446,9 +472,15 @@ public class TestDefaultSorter {
     MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
     
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
         context.getTotalMemoryAvailableToTask()), handler);
-    DefaultSorter sorter = new DefaultSorter(context, conf, 1, 
handler.getMemoryAssigned());
+    SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, 
handler.getMemoryAssigned());
+    DefaultSorter sorter = sorterWrapper.getSorter();
 
-    writeData(sorter, 1000, 10);
+    Text[] keys = generateData(1000, 10);
+    Text[] values = generateData(1000, 10);
+    for (int i = 0; i < keys.length; i++) {
+      sorterWrapper.writeKeyValue(keys[i], values[i]);
+    }
+    sorterWrapper.close();
     assertTrue(sorter.getNumSpills() == 1);
     verifyCounters(sorter, context);
 
@@ -479,9 +511,16 @@ public class TestDefaultSorter {
     MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
     
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
         context.getTotalMemoryAvailableToTask()), handler);
-    DefaultSorter sorter = new DefaultSorter(context, conf, 1, 
handler.getMemoryAssigned());
 
-    writeData(sorter, 1000, 10);
+    SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, 
handler.getMemoryAssigned());
+    DefaultSorter sorter = sorterWrapper.getSorter();
+
+    Text[] keys = generateData(1000, 10);
+    Text[] values = generateData(1000, 10);
+    for (int i = 0; i < keys.length; i++) {
+      sorterWrapper.writeKeyValue(keys[i], values[i]);
+    }
+    sorterWrapper.close();
     assertTrue(sorter.getNumSpills() == 1);
     ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class);
     verify(context, times(1)).sendEvents(eventCaptor.capture());
@@ -510,9 +549,16 @@ public class TestDefaultSorter {
     MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
     
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
         context.getTotalMemoryAvailableToTask()), handler);
-    DefaultSorter sorter = new DefaultSorter(context, conf, 1, 
handler.getMemoryAssigned());
+    SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, 
handler.getMemoryAssigned());
+    DefaultSorter sorter = sorterWrapper.getSorter();
+
+    Text[] keys = generateData(10000, 1000);
+    Text[] values = generateData(10000, 1000);
+    for (int i = 0; i < keys.length; i++) {
+      sorterWrapper.writeKeyValue(keys[i], values[i]);
+    }
+    sorterWrapper.close();
 
-    writeData(sorter, 10000, 1000);
     int spillCount = sorter.getNumSpills();
     ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class);
     verify(context, times(1)).sendEvents(eventCaptor.capture());
@@ -574,14 +620,57 @@ public class TestDefaultSorter {
     verify(context, atLeastOnce()).notifyProgress();
   }
 
-  private void writeData(ExternalSorter sorter, int numKeys, int keyLen) 
throws IOException {
-    for (int i = 0; i < numKeys; i++) {
-      Text key = new Text(RandomStringUtils.randomAlphanumeric(keyLen));
-      Text value = new Text(RandomStringUtils.randomAlphanumeric(keyLen));
+  private static class SorterWrapper {
+
+    private final DefaultSorter sorter;
+    private final Partitioner partitioner;
+    private final BitSet nonEmptyPartitions;
+    private final Object[] lastKeys;
+    private final int numPartitions;
+
+
+    public SorterWrapper(OutputContext context, Configuration conf, int 
numPartitions, long memoryAssigned) throws IOException {
+      sorter = new DefaultSorter(context, conf, numPartitions, memoryAssigned);
+      partitioner = TezRuntimeUtils.instantiatePartitioner(conf);
+      nonEmptyPartitions = new BitSet(numPartitions);
+      lastKeys = new Object[numPartitions];
+      this.numPartitions = numPartitions;
+    }
+
+    public boolean writeKeyValue(Object key, Object value) throws IOException {
+      int partition = partitioner.getPartition(key, value, this.numPartitions);
+      nonEmptyPartitions.set(partition);
       sorter.write(key, value);
+
+      boolean isRLE = key.equals(lastKeys[partition]);
+      lastKeys[partition] = key;
+      return isRLE;
+    }
+
+    public int getNonEmptyPartitionsCount() {
+      return nonEmptyPartitions.cardinality();
+    }
+
+    public int getEmptyPartitionsCount() {
+      return numPartitions - nonEmptyPartitions.cardinality();
+    }
+
+    public void close () throws IOException {
+      sorter.flush();
+      sorter.close();
+    }
+
+    public DefaultSorter getSorter() {
+      return sorter;
+    }
+  }
+
+  private static Text[] generateData(int numKeys, int keyLen) {
+    Text[] ret = new Text[numKeys];
+    for (int i = 0; i < numKeys; i++) {
+      ret[i] = new Text(RandomStringUtils.randomAlphanumeric(keyLen));
     }
-    sorter.flush();
-    sorter.close();
+    return ret;
   }
 
   private OutputContext createTezOutputContext() throws IOException {

Reply via email to