Repository: tez Updated Branches: refs/heads/master c5c26c655 -> cf6ea5f62
TEZ-3924. TestDefaultSorter fails intermittently due random keys and interaction with RLE and partition collisions (Jonathan Eagles via kshukla) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cf6ea5f6 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cf6ea5f6 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cf6ea5f6 Branch: refs/heads/master Commit: cf6ea5f62aa4a5957028e6c074a62747324abaa1 Parents: c5c26c6 Author: Kuhu Shukla <[email protected]> Authored: Fri May 4 15:47:41 2018 -0500 Committer: Kuhu Shukla <[email protected]> Committed: Fri May 4 15:47:41 2018 -0500 ---------------------------------------------------------------------- .../sort/impl/dflt/TestDefaultSorter.java | 191 ++++++++++++++----- 1 file changed, 140 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/cf6ea5f6/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index aad232a..e0fb153 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -19,7 +19,9 @@ package org.apache.tez.runtime.library.common.sort.impl.dflt; import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.tez.runtime.library.api.Partitioner; import org.apache.tez.runtime.library.common.Constants; +import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.junit.Assert; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -269,11 +271,17 @@ public class TestDefaultSorter { conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1); context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, context.getTotalMemoryAvailableToTask()), handler); - DefaultSorter sorter = new DefaultSorter(context, conf, 5, handler.getMemoryAssigned()); + SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 5, handler.getMemoryAssigned()); + DefaultSorter sorter = sorterWrapper.getSorter(); //Write 1000 keys each of size 1000, (> 1 spill should happen) try { - writeData(sorter, 1000, 1000); + Text[] keys = generateData(1000, 1000); + Text[] values = generateData(1000, 1000); + for (int i = 0; i < keys.length; i++) { + sorterWrapper.writeKeyValue(keys[i], values[i]); + } + sorterWrapper.close(); assertTrue(sorter.getNumSpills() > 2); verifyCounters(sorter, context); } catch(IOException ioe) { @@ -285,11 +293,13 @@ public class TestDefaultSorter { @Test(timeout = 30000) public void testEmptyCaseFileLengths() throws IOException { - testEmptyCaseFileLengthsHelper(50, 2, 1, 48); - testEmptyCaseFileLengthsHelper(1, 1, 10, 0); + testEmptyCaseFileLengthsHelper(50, new String[] {"a", "b"}, new String[] {"1", "2"}); + testEmptyCaseFileLengthsHelper(50, new String[] {"a", "a"}, new String[] {"1", "2"}); + testEmptyCaseFileLengthsHelper(50, new String[] {"aaa", "bbb", "aaa"}, new String[] {"1", "2", "3"}); + testEmptyCaseFileLengthsHelper(1, new String[] {"abcdefghij"}, new String[] {"1234567890"}); } - public void testEmptyCaseFileLengthsHelper(int numPartitions, int numKeys, int keyLen, int expectedEmptyPartitions) + public void testEmptyCaseFileLengthsHelper(int numPartitions, String[] keys, String[] values) throws IOException { OutputContext context = createTezOutputContext(); @@ -298,39 +308,49 @@ public class TestDefaultSorter { context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, context.getTotalMemoryAvailableToTask()), handler); String auxService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); - DefaultSorter sorter = new DefaultSorter(context, conf, numPartitions, handler.getMemoryAssigned()); - try { - writeData(sorter, numKeys, keyLen); - List<Event> events = new ArrayList<Event>(); - String pathComponent = (context.getUniqueIdentifier() + "_" + 0); - ShuffleUtils.generateEventOnSpill(events, true, true, context, 0, - sorter.indexCacheList.get(0), 0, true, pathComponent, sorter.getPartitionStats(), - sorter.reportDetailedPartitionStats(), auxService, TezCommonUtils.newBestCompressionDeflater()); - - CompositeDataMovementEvent compositeDataMovementEvent = - (CompositeDataMovementEvent) events.get(1); - ByteBuffer bb = compositeDataMovementEvent.getUserPayload(); - ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = - ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb)); - - if (shufflePayload.hasEmptyPartitions()) { - byte[] emptyPartitionsBytesString = - TezCommonUtils.decompressByteStringToByteArray( - shufflePayload.getEmptyPartitions()); - BitSet emptyPartitionBitSet = TezUtilsInternal.fromByteArray(emptyPartitionsBytesString); - Assert.assertTrue("Number of empty partitions did not match!", - emptyPartitionBitSet.cardinality() == expectedEmptyPartitions); - } else { - Assert.assertTrue(expectedEmptyPartitions == 0); - } - //4 bytes of header + numKeys* 2 *(keydata.length + keyLength.length) + 2 * 1 byte of EOF_MARKER + 4 bytes of checksum - assertEquals("Unexpected Output File Size!", - localFs.getFileStatus(sorter.getFinalOutputFile()).getLen(), numKeys * (4 + (2 * (2 + keyLen)) + 2 + 4)); - assertTrue(sorter.getNumSpills() == 1); - verifyCounters(sorter, context); - } catch(IOException ioe) { - fail(ioe.getMessage()); + SorterWrapper sorterWrapper = new SorterWrapper(context, conf, numPartitions, handler.getMemoryAssigned()); + DefaultSorter sorter = sorterWrapper.getSorter(); + assertEquals("Key and Values must have the same number of elements", keys.length, values.length); + BitSet keyRLEs = new BitSet(keys.length); + for (int i = 0; i < keys.length; i++) { + boolean isRLE = sorterWrapper.writeKeyValue(new Text(keys[i]), new Text(values[i])); + keyRLEs.set(i, isRLE); + } + sorterWrapper.close(); + + List<Event> events = new ArrayList<>(); + String pathComponent = (context.getUniqueIdentifier() + "_" + 0); + ShuffleUtils.generateEventOnSpill(events, true, true, context, 0, + sorter.indexCacheList.get(0), 0, true, pathComponent, sorter.getPartitionStats(), + sorter.reportDetailedPartitionStats(), auxService, TezCommonUtils.newBestCompressionDeflater()); + + CompositeDataMovementEvent compositeDataMovementEvent = + (CompositeDataMovementEvent) events.get(1); + ByteBuffer bb = compositeDataMovementEvent.getUserPayload(); + ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = + ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb)); + + if (shufflePayload.hasEmptyPartitions()) { + byte[] emptyPartitionsBytesString = + TezCommonUtils.decompressByteStringToByteArray( + shufflePayload.getEmptyPartitions()); + BitSet emptyPartitionBitSet = TezUtilsInternal.fromByteArray(emptyPartitionsBytesString); + Assert.assertEquals("Number of empty partitions did not match!", + emptyPartitionBitSet.cardinality(), sorterWrapper.getEmptyPartitionsCount()); + } else { + Assert.assertEquals(sorterWrapper.getEmptyPartitionsCount(), 0); + } + // Each non-empty partition adds 4 bytes for header, 2 bytes for EOF_MARKER, 4 bytes for checksum + int expectedFileOutLength = sorterWrapper.getNonEmptyPartitionsCount() * 10; + for (int i = 0; i < keys.length; i++) { + // Each Record adds 1 byte for key length, 1 byte Text overhead (length), key.length bytes for key + expectedFileOutLength += keys[i].length() + 2; + // Each Record adds 1 byte for value length, 1 byte Text overhead (length), value.length bytes for value + expectedFileOutLength += values[i].length() + 2; } + assertEquals("Unexpected Output File Size!", localFs.getFileStatus(sorter.getFinalOutputFile()).getLen(), expectedFileOutLength); + assertEquals(sorter.getNumSpills(), 1); + verifyCounters(sorter, context); } @Test @@ -396,9 +416,15 @@ public class TestDefaultSorter { context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, context.getTotalMemoryAvailableToTask()), handler); int partitions = 50; - DefaultSorter sorter = new DefaultSorter(context, conf, partitions, handler.getMemoryAssigned()); + SorterWrapper sorterWrapper = new SorterWrapper(context, conf, partitions, handler.getMemoryAssigned()); + DefaultSorter sorter = sorterWrapper.getSorter(); - writeData(sorter, numKeys, 1000000); + Text[] keys = generateData(numKeys, 1000000); + Text[] values = generateData(numKeys, 1000000); + for (int i = 0; i < keys.length; i++) { + sorterWrapper.writeKeyValue(keys[i], values[i]); + } + sorterWrapper.close(); if (numKeys == 0) { assertTrue(sorter.getNumSpills() == 1); } else { @@ -446,9 +472,15 @@ public class TestDefaultSorter { MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler(); context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, context.getTotalMemoryAvailableToTask()), handler); - DefaultSorter sorter = new DefaultSorter(context, conf, 1, handler.getMemoryAssigned()); + SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned()); + DefaultSorter sorter = sorterWrapper.getSorter(); - writeData(sorter, 1000, 10); + Text[] keys = generateData(1000, 10); + Text[] values = generateData(1000, 10); + for (int i = 0; i < keys.length; i++) { + sorterWrapper.writeKeyValue(keys[i], values[i]); + } + sorterWrapper.close(); assertTrue(sorter.getNumSpills() == 1); verifyCounters(sorter, context); @@ -479,9 +511,16 @@ public class TestDefaultSorter { MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler(); context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, context.getTotalMemoryAvailableToTask()), handler); - DefaultSorter sorter = new DefaultSorter(context, conf, 1, handler.getMemoryAssigned()); - writeData(sorter, 1000, 10); + SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned()); + DefaultSorter sorter = sorterWrapper.getSorter(); + + Text[] keys = generateData(1000, 10); + Text[] values = generateData(1000, 10); + for (int i = 0; i < keys.length; i++) { + sorterWrapper.writeKeyValue(keys[i], values[i]); + } + sorterWrapper.close(); assertTrue(sorter.getNumSpills() == 1); ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class); verify(context, times(1)).sendEvents(eventCaptor.capture()); @@ -510,9 +549,16 @@ public class TestDefaultSorter { MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler(); context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, context.getTotalMemoryAvailableToTask()), handler); - DefaultSorter sorter = new DefaultSorter(context, conf, 1, handler.getMemoryAssigned()); + SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned()); + DefaultSorter sorter = sorterWrapper.getSorter(); + + Text[] keys = generateData(10000, 1000); + Text[] values = generateData(10000, 1000); + for (int i = 0; i < keys.length; i++) { + sorterWrapper.writeKeyValue(keys[i], values[i]); + } + sorterWrapper.close(); - writeData(sorter, 10000, 1000); int spillCount = sorter.getNumSpills(); ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class); verify(context, times(1)).sendEvents(eventCaptor.capture()); @@ -574,14 +620,57 @@ public class TestDefaultSorter { verify(context, atLeastOnce()).notifyProgress(); } - private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException { - for (int i = 0; i < numKeys; i++) { - Text key = new Text(RandomStringUtils.randomAlphanumeric(keyLen)); - Text value = new Text(RandomStringUtils.randomAlphanumeric(keyLen)); + private static class SorterWrapper { + + private final DefaultSorter sorter; + private final Partitioner partitioner; + private final BitSet nonEmptyPartitions; + private final Object[] lastKeys; + private final int numPartitions; + + + public SorterWrapper(OutputContext context, Configuration conf, int numPartitions, long memoryAssigned) throws IOException { + sorter = new DefaultSorter(context, conf, numPartitions, memoryAssigned); + partitioner = TezRuntimeUtils.instantiatePartitioner(conf); + nonEmptyPartitions = new BitSet(numPartitions); + lastKeys = new Object[numPartitions]; + this.numPartitions = numPartitions; + } + + public boolean writeKeyValue(Object key, Object value) throws IOException { + int partition = partitioner.getPartition(key, value, this.numPartitions); + nonEmptyPartitions.set(partition); sorter.write(key, value); + + boolean isRLE = key.equals(lastKeys[partition]); + lastKeys[partition] = key; + return isRLE; + } + + public int getNonEmptyPartitionsCount() { + return nonEmptyPartitions.cardinality(); + } + + public int getEmptyPartitionsCount() { + return numPartitions - nonEmptyPartitions.cardinality(); + } + + public void close () throws IOException { + sorter.flush(); + sorter.close(); + } + + public DefaultSorter getSorter() { + return sorter; + } + } + + private static Text[] generateData(int numKeys, int keyLen) { + Text[] ret = new Text[numKeys]; + for (int i = 0; i < numKeys; i++) { + ret[i] = new Text(RandomStringUtils.randomAlphanumeric(keyLen)); } - sorter.flush(); - sorter.close(); + return ret; } private OutputContext createTezOutputContext() throws IOException {
