Repository: tez Updated Branches: refs/heads/master f6ea0fb33 -> c411e4edc
TEZ-2405. PipelinedSorter can throw NPE with custom compartor (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c411e4ed Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c411e4ed Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c411e4ed Branch: refs/heads/master Commit: c411e4edced690d111dac3cf2afcbb6cd39354f4 Parents: f6ea0fb Author: Rajesh Balamohan <[email protected]> Authored: Mon May 4 12:38:34 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Mon May 4 12:38:34 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/sort/impl/PipelinedSorter.java | 2 +- .../common/sort/impl/TestPipelinedSorter.java | 53 +++++++++++++++++--- 3 files changed, 47 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c411e4ed/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8108ac8..6c19770 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2405. PipelinedSorter can throw NPE with custom compartor. TEZ-1897. Create a concurrent version of AsyncDispatcher TEZ-2394. Issues when there is an error in VertexManager callbacks TEZ-2386. Tez UI: Inconsistent usage of icon colors http://git-wip-us.apache.org/repos/asf/tez/blob/c411e4ed/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 65606bf..661f54c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -749,7 +749,7 @@ public class PipelinedSorter extends ExternalSorter { cmp = comparator.compare(buf, keystart + off , (valstart - keystart), needle.getData(), - needle.getPosition(), needle.getLength()); + needle.getPosition(), (needle.getLength() - needle.getPosition())); } return cmp; } http://git-wip-us.apache.org/repos/asf/tez/blob/c411e4ed/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index 6e56567..5de96c9 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -8,6 +8,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -22,6 +23,7 @@ import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl; import org.apache.tez.runtime.library.partitioner.HashPartitioner; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -61,10 +63,10 @@ public class TestPipelinedSorter { private static final Configuration conf = new Configuration(); private static FileSystem localFs = null; private static Path workDir = null; + private OutputContext outputContext; private int numOutputs; private long initialAvailableMem; - private OutputContext outputContext; //TODO: Need to make it nested structure so that multiple partition cases can be validated private static TreeMap<String, String> sortedDataMap = Maps.newTreeMap(); @@ -82,6 +84,11 @@ public class TestPipelinedSorter { } } + @AfterClass + public static void cleanup() throws IOException { + localFs.delete(workDir, true); + } + @Before public void setup() throws IOException { ApplicationId appId = ApplicationId.newInstance(10000, 1); @@ -89,12 +96,14 @@ public class TestPipelinedSorter { String uniqueId = UUID.randomUUID().toString(); this.outputContext = createMockOutputContext(counters, appId, uniqueId); - //To enable PipelinedSorter, set 2 threads + //To enable PipelinedSorter conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.PIPELINED.name()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); - conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, - HashPartitioner.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, HashPartitioner.class.getName()); + + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); //Setup localdirs String localDirs = workDir.toString(); @@ -102,9 +111,8 @@ public class TestPipelinedSorter { } @After - public void cleanup() throws IOException { - localFs.delete(workDir, true); - sortedDataMap.clear(); + public void reset() throws IOException { + cleanup(); localFs.mkdirs(workDir); } @@ -133,6 +141,13 @@ public class TestPipelinedSorter { } @Test + public void testWithCustomComparator() throws IOException { + //Test with custom comparator + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, CustomComparator.class.getName()); + basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20); + } + + @Test public void testWithPipelinedShuffle() throws IOException { this.numOutputs = 1; this.initialAvailableMem = 5 *1024 * 1024; @@ -251,7 +266,7 @@ public class TestPipelinedSorter { Assert.assertTrue(numRecordsRead == sortedDataMap.size()); } - private OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId, + private static OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId, String uniqueId) throws IOException { OutputContext outputContext = mock(OutputContext.class); @@ -280,4 +295,26 @@ public class TestPipelinedSorter { doReturn(outDirs).when(outputContext).getWorkDirs(); return outputContext; } + + /** + * E.g Hive uses TezBytesComparator which internally makes use of WritableComparator's comparison. + * Any length mismatches are handled there. + * + * However, custom comparators can handle this differently and might throw + * IndexOutOfBoundsException in case of invalid lengths. + * + * This comparator (similar to comparator in BinInterSedes of pig) would thrown exception when + * wrong lengths are mentioned. + */ + public static class CustomComparator extends WritableComparator { + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + //wrapping is done so that it would throw exceptions on wrong lengths + ByteBuffer bb1 = ByteBuffer.wrap(b1, s1, l1); + ByteBuffer bb2 = ByteBuffer.wrap(b2, s2, l2); + + return bb1.compareTo(bb2); + } + + } }
