Repository: tez Updated Branches: refs/heads/master 539b0e129 -> 6f7591b8d
TEZ-2911. Null location Strings can cause problems with GroupedSplit serialization. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6f7591b8 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6f7591b8 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6f7591b8 Branch: refs/heads/master Commit: 6f7591b8d5e9b990c652e6dc7a7fb41dedd19179 Parents: 539b0e1 Author: Siddharth Seth <[email protected]> Authored: Fri Dec 11 14:06:09 2015 -0800 Committer: Siddharth Seth <[email protected]> Committed: Fri Dec 11 14:06:09 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/mapreduce/grouper/TezSplitGrouper.java | 34 ++++++++++- .../hadoop/mapred/split/TestGroupedSplits.java | 60 ++++++++++++++++++++ 3 files changed, 93 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6f7591b8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d02aa4f..675c2b3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES TEZ-604. Revert temporary changes made in TEZ-603 to kill the provided tez session, if running a MapReduce job. ALL CHANGES: + TEZ-2911. Null location Strings can cause problems with GroupedSplit serialization. TEZ-2990. Change test-patch.sh to run through all tests, despite failures in upstream modules TEZ-2798. NPE when executing TestMemoryWithEvents::testMemoryScatterGather. TEZ-2963. RecoveryService#handleSummaryEvent exception with HDFS transparent encryption + kerberos authentication. http://git-wip-us.apache.org/repos/asf/tez/blob/6f7591b8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java index 848b06f..163a2a3 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -148,6 +149,8 @@ public abstract class TezSplitGrouper { return new HashMap<String, LocationHolder>(); } + + public List<GroupedSplitContainer> getGroupedSplits(Configuration conf, List<SplitContainer> originalSplits, int desiredNumSplits, @@ -233,10 +236,9 @@ public abstract class TezSplitGrouper { LOG.info("Using original number of splits: " + originalSplits.size() + " desired splits: " + desiredNumSplits); groupedSplits = new ArrayList<GroupedSplitContainer>(originalSplits.size()); - // TODO TEZ-2911 null in the non null String[] handled differently here compared to when grouping happens. for (SplitContainer split : originalSplits) { GroupedSplitContainer newSplit = - new GroupedSplitContainer(1, wrappedInputFormatName, locationProvider.getPreferredLocations(split), + new GroupedSplitContainer(1, wrappedInputFormatName, cleanupLocations(locationProvider.getPreferredLocations(split)), null); newSplit.addSplit(split); groupedSplits.add(newSplit); @@ -518,6 +520,34 @@ public abstract class TezSplitGrouper { return groupedSplits; } + private String[] cleanupLocations(String[] locations) { + if (locations == null || locations.length == 0) { + return null; + } + boolean nullLocationFound = false; + for (String location : locations) { + if (location == null) { + nullLocationFound = true; + break; + } + } + if (!nullLocationFound) { + return locations; + } else { + List<String> newLocations = new LinkedList<>(); + for (String location : locations) { + if (location != null) { + newLocations.add(location); + } + } + if (newLocations.size() == 0) { + return null; + } else { + return newLocations.toArray(new String[newLocations.size()]); + } + } + } + /** * Builder that can be used to configure grouping in Tez * http://git-wip-us.apache.org/repos/asf/tez/blob/6f7591b8/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java index 43776f7..140a09d 100644 --- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java +++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java @@ -549,6 +549,66 @@ public class TestGroupedSplits { } @SuppressWarnings({ "rawtypes", "unchecked" }) + // No grouping + @Test(timeout=10000) + public void testGroupedSplitWithBadLocations2() throws IOException { + JobConf job = new JobConf(defaultConf); + InputFormat mockWrappedFormat = mock(InputFormat.class); + TezGroupedSplitsInputFormat<LongWritable , Text> format = + new TezGroupedSplitsInputFormat<LongWritable, Text>(); + format.setConf(job); + format.setInputFormat(mockWrappedFormat); + + // put multiple splits with multiple copies in the same location + String validLocation = "validLocation"; + String validLocation2 = "validLocation2"; + int numSplits = 5; + InputSplit[] mockSplits = new InputSplit[numSplits]; + InputSplit mockSplit1 = mock(InputSplit.class); + when(mockSplit1.getLength()).thenReturn(100*1000*1000l); + when(mockSplit1.getLocations()).thenReturn(null); + mockSplits[0] = mockSplit1; + InputSplit mockSplit2 = mock(InputSplit.class); + when(mockSplit2.getLength()).thenReturn(100*1000*1000l); + when(mockSplit2.getLocations()).thenReturn(new String[] {null}); + mockSplits[1] = mockSplit2; + InputSplit mockSplit3 = mock(InputSplit.class); + when(mockSplit3.getLength()).thenReturn(100*1000*1000l); + when(mockSplit3.getLocations()).thenReturn(new String[] {null, null}); + mockSplits[2] = mockSplit3; + InputSplit mockSplit4 = mock(InputSplit.class); + when(mockSplit4.getLength()).thenReturn(100*1000*1000l); + when(mockSplit4.getLocations()).thenReturn(new String[] {validLocation}); + mockSplits[3] = mockSplit4; + InputSplit mockSplit5 = mock(InputSplit.class); + when(mockSplit5.getLength()).thenReturn(100*1000*1000l); + when(mockSplit5.getLocations()).thenReturn(new String[] {validLocation, null, validLocation2}); + mockSplits[4] = mockSplit4; + + when(mockWrappedFormat.getSplits((JobConf)anyObject(), anyInt())).thenReturn(mockSplits); + + format.setDesiredNumberOfSplits(numSplits); + InputSplit[] splits = format.getSplits(job, 1); + Assert.assertEquals(numSplits, splits.length); + for (int i = 0 ; i < numSplits ; i++) { + TezGroupedSplit split = (TezGroupedSplit) splits[i]; + // all 3 splits are present + Assert.assertEquals(1, split.wrappedSplits.size()); + if (i==3) { + Assert.assertEquals(1, split.getLocations().length); + Assert.assertEquals(validLocation, split.getLocations()[0]); + } else if (i==4) { + Assert.assertEquals(1, split.getLocations().length); + Assert.assertTrue(split.getLocations()[0].equals(validLocation) || split.getLocations()[0].equals(validLocation2)); + } else { + Assert.assertNull(split.getLocations()); + } + ByteArrayOutputStream bOut = new ByteArrayOutputStream(); + split.write(new DataOutputStream(bOut)); + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test(timeout=10000) public void testGroupedSplitWithEstimator() throws IOException { JobConf job = new JobConf(defaultConf);
