TEZ-2978. Add an option to allow the SplitGrouper to generate node local only groups. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0c085771 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0c085771 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0c085771 Branch: refs/heads/TEZ-2980 Commit: 0c085771b8c501d4fa492ab7c9dc57a1abcae52b Parents: 85637c6 Author: Siddharth Seth <[email protected]> Authored: Tue Jan 12 15:16:31 2016 -0800 Committer: Siddharth Seth <[email protected]> Committed: Tue Jan 12 15:16:31 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/mapreduce/grouper/TezSplitGrouper.java | 33 +++- .../hadoop/mapred/split/TestGroupedSplits.java | 192 ++++++++++++++----- 3 files changed, 180 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/0c085771/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ea2b1d5..6cdc037 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES: + TEZ-2978. Add an option to allow the SplitGrouper to generate node local only groups. TEZ-2129. Task and Attempt views should contain links to the logs TEZ-3025. InputInitializer creation should use the dag ugi. TEZ-3017. HistoryACLManager does not have a close method for cleanup http://git-wip-us.apache.org/repos/asf/tez/blob/0c085771/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java index 163a2a3..9435e68 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java @@ -94,6 +94,13 @@ public abstract class TezSplitGrouper { public static final String TEZ_GROUPING_REPEATABLE = "tez.grouping.repeatable"; public static final boolean TEZ_GROUPING_REPEATABLE_DEFAULT = true; + /** + * Generate node local splits only. This prevents fallback to rack locality etc, and overrides + * the target size for small splits. + */ + public static final String TEZ_GROUPING_NODE_LOCAL_ONLY = "tez.grouping.node.local.only"; + public static final boolean TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT = false; + static class LocationHolder { List<SplitContainer> splits; @@ -302,6 +309,9 @@ public abstract class TezSplitGrouper { boolean groupByCount = conf.getBoolean( TEZ_GROUPING_SPLIT_BY_COUNT, TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT); + boolean nodeLocalOnly = conf.getBoolean( + TEZ_GROUPING_NODE_LOCAL_ONLY, + TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT); if (!(groupByLength || groupByCount)) { throw new TezUncheckedException( "None of the grouping parameters are true: " @@ -315,7 +325,9 @@ public abstract class TezSplitGrouper { " numSplitsInGroup: " + numSplitsInGroup + " totalLength: " + totalLength + " numOriginalSplits: " + originalSplits.size() + - " . Grouping by length: " + groupByLength + " count: " + groupByCount); + " . Grouping by length: " + groupByLength + + " count: " + groupByCount + + " nodeLocalOnly: " + nodeLocalOnly); // go through locations and group splits int splitsProcessed = 0; @@ -332,7 +344,6 @@ public abstract class TezSplitGrouper { groupLocationSet.clear(); String location = entry.getKey(); LocationHolder holder = entry.getValue(); - // KKK rename to splitContainer SplitContainer splitContainer = holder.getUnprocessedHeadSplit(); if (splitContainer == null) { // all splits on node processed @@ -402,7 +413,18 @@ public abstract class TezSplitGrouper { } if (!doingRackLocal && numFullGroupsCreated < 1) { - // no node could create a node-local group. go rack-local + // no node could create a regular node-local group. + + // Allow small groups if that is configured. + if (nodeLocalOnly && !allowSmallGroups) { + LOG.info( + "Allowing small groups early after attempting to create full groups at iteration: {}, groupsCreatedSoFar={}", + iterations, groupedSplits.size()); + allowSmallGroups = true; + continue; + } + + // else go rack-local doingRackLocal = true; // re-create locations int numRemainingSplits = originalSplits.size() - splitsProcessed; @@ -601,6 +623,11 @@ public abstract class TezSplitGrouper { return this; } + public TezMRSplitsGrouperConfigBuilder setNodeLocalGroupsOnly(boolean nodeLocalGroupsOnly) { + this.conf.setBoolean(TEZ_GROUPING_NODE_LOCAL_ONLY, nodeLocalGroupsOnly); + return this; + } + /** * upper and lower bounds for the splits */ http://git-wip-us.apache.org/repos/asf/tez/blob/0c085771/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java index 140a09d..fba72a3 100644 --- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java +++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java @@ -28,10 +28,13 @@ import java.io.OutputStreamWriter; import java.io.Writer; import java.util.ArrayList; import java.util.BitSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; +import org.apache.commons.lang.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.tez.mapreduce.grouper.TezSplitGrouper; import org.slf4j.Logger; @@ -57,6 +60,9 @@ import org.junit.Test; import com.google.common.collect.Sets; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.*; public class TestGroupedSplits { @@ -118,9 +124,9 @@ public class TestGroupedSplits { // we should have a single split as the length is comfortably smaller than // the block size - Assert.assertEquals("We got more than one splits!", 1, splits.length); + assertEquals("We got more than one splits!", 1, splits.length); InputSplit split = splits[0]; - Assert.assertEquals("It should be TezGroupedSplit", + assertEquals("It should be TezGroupedSplit", TezGroupedSplit.class, split.getClass()); // check the split @@ -137,7 +143,7 @@ public class TestGroupedSplits { LOG.warn("conflict with " + v + " at position "+reader.getPos()); } - Assert.assertFalse("Key in multiple partitions.", bits.get(v)); + assertFalse("Key in multiple partitions.", bits.get(v)); bits.set(v); count++; } @@ -145,7 +151,7 @@ public class TestGroupedSplits { } finally { reader.close(); } - Assert.assertEquals("Some keys in no partition.", length, bits.cardinality()); + assertEquals("Some keys in no partition.", length, bits.cardinality()); } } @@ -260,14 +266,14 @@ public class TestGroupedSplits { if (j==1) { // j==1 covers single split corner case // and does not do grouping - Assert.assertEquals("compressed splits == " + j, j, splits.length); + assertEquals("compressed splits == " + j, j, splits.length); } List<Text> results = new ArrayList<Text>(); for (int i=0; i<splits.length; ++i) { List<Text> read = readSplit(format, splits[i], job); results.addAll(read); } - Assert.assertEquals("splits length", 11, results.size()); + assertEquals("splits length", 11, results.size()); final String[] firstList = {"the quick", "brown", "fox jumped", "over", " the lazy", " dog"}; @@ -293,7 +299,7 @@ public class TestGroupedSplits { private static int testResults(List<Text> results, String[] first, int start) { for (int i = 0; i < first.length; i++) { - Assert.assertEquals("splits["+i+"]", first[i], results.get(start+i).toString()); + assertEquals("splits["+i+"]", first[i], results.get(start+i).toString()); } return first.length+start; } @@ -324,17 +330,17 @@ public class TestGroupedSplits { // desired splits not set. We end up choosing min/max split size based on // total data and num original splits. In this case, min size will be hit InputSplit[] splits = format.getSplits(job, 0); - Assert.assertEquals(25, splits.length); + assertEquals(25, splits.length); // split too big. override with max format.setDesiredNumberOfSplits(1); splits = format.getSplits(job, 0); - Assert.assertEquals(4, splits.length); + assertEquals(4, splits.length); // splits too small. override with min format.setDesiredNumberOfSplits(1000); splits = format.getSplits(job, 0); - Assert.assertEquals(25, splits.length); + assertEquals(25, splits.length); } @@ -398,7 +404,7 @@ public class TestGroupedSplits { // the remainig 3 splits (1 from each node) will be grouped at rack level (default-rack) // all of them will maintain ordering InputSplit[] groupedSplits = grouper.getGroupedSplits(conf, origSplits, 4, "InputFormat"); - Assert.assertEquals(4, groupedSplits.length); + assertEquals(4, groupedSplits.length); for (int i=0; i<4; ++i) { TezGroupedSplit split = (TezGroupedSplit)groupedSplits[i]; List<InputSplit> innerSplits = split.getGroupedSplits(); @@ -406,12 +412,12 @@ public class TestGroupedSplits { // splits in group maintain original order for (InputSplit innerSplit : innerSplits) { int splitPos = ((TestInputSplit) innerSplit).getPosition(); - Assert.assertTrue(pos < splitPos); + assertTrue(pos < splitPos); pos = splitPos; } // last one is rack split if (i==3) { - Assert.assertTrue(split.getRack() != null); + assertTrue(split.getRack() != null); } } } @@ -456,24 +462,25 @@ public class TestGroupedSplits { // all of them will maintain ordering InputSplit[] groupedSplits1 = grouper.getGroupedSplits(conf, origSplits, 4, "InputFormat"); InputSplit[] groupedSplits2 = grouper.getGroupedSplits(conf, origSplits, 4, "InputFormat"); - Assert.assertEquals(4, groupedSplits1.length); - Assert.assertEquals(4, groupedSplits2.length); + // KKK Start looking here. + assertEquals(4, groupedSplits1.length); + assertEquals(4, groupedSplits2.length); // check both split groups are the same. this depends on maintaining split order tested above for (int i=0; i<4; ++i) { TezGroupedSplit gSplit1 = ((TezGroupedSplit) groupedSplits1[i]); List<InputSplit> testSplits1 = gSplit1.getGroupedSplits(); TezGroupedSplit gSplit2 = ((TezGroupedSplit) groupedSplits2[i]); List<InputSplit> testSplits2 = gSplit2.getGroupedSplits(); - Assert.assertEquals(testSplits1.size(), testSplits2.size()); + assertEquals(testSplits1.size(), testSplits2.size()); for (int j=0; j<testSplits1.size(); j++) { TestInputSplit split1 = (TestInputSplit) testSplits1.get(j); TestInputSplit split2 = (TestInputSplit) testSplits2.get(j); - Assert.assertEquals(split1.position, split2.position); + assertEquals(split1.position, split2.position); } if (i==3) { // check for rack split creation. Ensures repeatability holds for rack splits also - Assert.assertTrue(gSplit1.getRack() != null); - Assert.assertTrue(gSplit2.getRack() != null); + assertTrue(gSplit1.getRack() != null); + assertTrue(gSplit2.getRack() != null); } } } @@ -502,12 +509,12 @@ public class TestGroupedSplits { format.setDesiredNumberOfSplits(1); InputSplit[] splits = format.getSplits(job, 1); - Assert.assertEquals(1, splits.length); + assertEquals(1, splits.length); TezGroupedSplit split = (TezGroupedSplit) splits[0]; // all 3 splits are present - Assert.assertEquals(numSplits, split.wrappedSplits.size()); + assertEquals(numSplits, split.wrappedSplits.size()); Set<InputSplit> splitSet = Sets.newHashSet(split.wrappedSplits); - Assert.assertEquals(numSplits, splitSet.size()); + assertEquals(numSplits, splitSet.size()); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -540,10 +547,10 @@ public class TestGroupedSplits { format.setDesiredNumberOfSplits(1); InputSplit[] splits = format.getSplits(job, 1); - Assert.assertEquals(1, splits.length); + assertEquals(1, splits.length); TezGroupedSplit split = (TezGroupedSplit) splits[0]; // all 3 splits are present - Assert.assertEquals(numSplits, split.wrappedSplits.size()); + assertEquals(numSplits, split.wrappedSplits.size()); ByteArrayOutputStream bOut = new ByteArrayOutputStream(); split.write(new DataOutputStream(bOut)); } @@ -589,17 +596,17 @@ public class TestGroupedSplits { format.setDesiredNumberOfSplits(numSplits); InputSplit[] splits = format.getSplits(job, 1); - Assert.assertEquals(numSplits, splits.length); + assertEquals(numSplits, splits.length); for (int i = 0 ; i < numSplits ; i++) { TezGroupedSplit split = (TezGroupedSplit) splits[i]; // all 3 splits are present - Assert.assertEquals(1, split.wrappedSplits.size()); + assertEquals(1, split.wrappedSplits.size()); if (i==3) { - Assert.assertEquals(1, split.getLocations().length); - Assert.assertEquals(validLocation, split.getLocations()[0]); + assertEquals(1, split.getLocations().length); + assertEquals(validLocation, split.getLocations()[0]); } else if (i==4) { - Assert.assertEquals(1, split.getLocations().length); - Assert.assertTrue(split.getLocations()[0].equals(validLocation) || split.getLocations()[0].equals(validLocation2)); + assertEquals(1, split.getLocations().length); + assertTrue(split.getLocations()[0].equals(validLocation) || split.getLocations()[0].equals(validLocation2)); } else { Assert.assertNull(split.getLocations()); } @@ -662,16 +669,16 @@ public class TestGroupedSplits { InputSplit[] splits = format.getSplits(job, 1); // due to the min = 12Mb - Assert.assertEquals(2, splits.length); + assertEquals(2, splits.length); for (InputSplit group : splits) { TezGroupedSplit split = (TezGroupedSplit) group; if (split.wrappedSplits.size() == 2) { // split1+split2 - Assert.assertEquals(split.getLength(), 2 * 1000 * 1000l); + assertEquals(split.getLength(), 2 * 1000 * 1000l); } else { // split3 - Assert.assertEquals(split.getLength(), 2 * 1000 * 1000l + 1); + assertEquals(split.getLength(), 2 * 1000 * 1000l + 1); } } } @@ -708,14 +715,14 @@ public class TestGroupedSplits { "MockInputForamt", null, locationProvider); // Sanity. 1 group, with 3 splits. - Assert.assertEquals(1, groupedSplits.length); - Assert.assertTrue(groupedSplits[0] instanceof TezGroupedSplit); + assertEquals(1, groupedSplits.length); + assertTrue(groupedSplits[0] instanceof TezGroupedSplit); TezGroupedSplit groupedSplit = (TezGroupedSplit)groupedSplits[0]; - Assert.assertEquals(3, groupedSplit.getGroupedSplits().size()); + assertEquals(3, groupedSplit.getGroupedSplits().size()); // Verify that the split ends up being grouped to the custom location. - Assert.assertEquals(1, groupedSplit.getLocations().length); - Assert.assertEquals("customLocation", groupedSplit.getLocations()[0]); + assertEquals(1, groupedSplit.getLocations().length); + assertEquals("customLocation", groupedSplit.getLocations()[0]); } // Original splits returned. @@ -749,15 +756,114 @@ public class TestGroupedSplits { "MockInputForamt", null, locationProvider); // Sanity. 3 group, with 1 split each - Assert.assertEquals(3, groupedSplits.length); + assertEquals(3, groupedSplits.length); for (int i = 0 ; i < 3 ; i++) { - Assert.assertTrue(groupedSplits[i] instanceof TezGroupedSplit); + assertTrue(groupedSplits[i] instanceof TezGroupedSplit); TezGroupedSplit groupedSplit = (TezGroupedSplit)groupedSplits[i]; - Assert.assertEquals(1, groupedSplit.getGroupedSplits().size()); + assertEquals(1, groupedSplit.getGroupedSplits().size()); // Verify the splits have their final location set to customLocation - Assert.assertEquals(1, groupedSplit.getLocations().length); - Assert.assertEquals("customLocation", groupedSplit.getLocations()[0]); + assertEquals(1, groupedSplit.getLocations().length); + assertEquals("customLocation", groupedSplit.getLocations()[0]); + } + } + + @Test(timeout = 5000) + public void testForceNodeLocalSplits() throws IOException { + int numLocations = 7; + long splitLen = 100L; + String[] locations = new String[numLocations]; + for (int i = 0; i < numLocations; i++) { + locations[i] = "node" + i; + } + + // Generate 24 splits (6 per node) spread evenly across node0-node3. + // Generate 1 split each on the remaining 3 nodes (4-6) + int numSplits = 27; + InputSplit[] rawSplits = new InputSplit[numSplits]; + for (int i = 0; i < 27; i++) { + String splitLoc[] = new String[1]; + if (i < 24) { + splitLoc[0] = locations[i % 4]; + } else { + splitLoc[0] = locations[4 + i % 24]; + } + rawSplits[i] = new TestInputSplit(splitLen, splitLoc, i); + } + + TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper(); + JobConf confDisallowSmallEarly = new JobConf(defaultConf); + confDisallowSmallEarly = (JobConf) TezSplitGrouper.newConfigBuilder(confDisallowSmallEarly) + .setGroupingSplitSize(splitLen * 3, splitLen * 3) + .setGroupingRackSplitSizeReduction(1) + .setNodeLocalGroupsOnly(false) + .build(); + + JobConf confSmallEarly = new JobConf(defaultConf); + confSmallEarly = (JobConf) TezSplitGrouper.newConfigBuilder(confSmallEarly) + .setGroupingSplitSize(splitLen * 3, splitLen * 3) + .setGroupingRackSplitSizeReduction(1) + .setNodeLocalGroupsOnly(true) + .build(); + + // Without early grouping -> 4 * 2 node local, 1 merged - 9 total + // With early grouping -> 4 * 2 node local (first 4 nodes), 3 smaller node local (4-6) -> 11 total + + // Requesting 9 based purely on size. + InputSplit[] groupedSplitsDisallowSmallEarly = + grouper.getGroupedSplits(confDisallowSmallEarly, rawSplits, 9, "InputFormat"); + assertEquals(9, groupedSplitsDisallowSmallEarly.length); + // Verify the actual splits as well. + Map<String, MutableInt> matchedLocations = new HashMap<>(); + verifySplitsFortestAllowSmallSplitsEarly(groupedSplitsDisallowSmallEarly); + TezGroupedSplit group = (TezGroupedSplit) groupedSplitsDisallowSmallEarly[8]; + assertEquals(3, group.getLocations().length); + assertEquals(3, group.getGroupedSplits().size()); + Set<String> exp = Sets.newHashSet(locations[4], locations[5], locations[6]); + for (int i = 0; i < 3; i++) { + LOG.info(group.getLocations()[i]); + exp.remove(group.getLocations()[i]); + } + assertEquals(0, exp.size()); + + InputSplit[] groupedSplitsSmallEarly = + grouper.getGroupedSplits(confSmallEarly, rawSplits, 9, "InputFormat"); + assertEquals(11, groupedSplitsSmallEarly.length); + // The first 8 are the larger groups. + verifySplitsFortestAllowSmallSplitsEarly(groupedSplitsSmallEarly); + exp = Sets.newHashSet(locations[4], locations[5], locations[6]); + for (int i = 8; i < 11; i++) { + group = (TezGroupedSplit) groupedSplitsSmallEarly[i]; + assertEquals(1, group.getLocations().length); + assertEquals(1, group.getGroupedSplits().size()); + String matchedLoc = group.getLocations()[0]; + assertTrue(exp.contains(matchedLoc)); + exp.remove(matchedLoc); + } + assertEquals(0, exp.size()); + } + + private void verifySplitsFortestAllowSmallSplitsEarly(InputSplit[] groupedSplits) throws + IOException { + Map<String, MutableInt> matchedLocations = new HashMap<>(); + for (int i = 0; i < 8; i++) { + TezGroupedSplit group = (TezGroupedSplit) groupedSplits[i]; + assertEquals(1, group.getLocations().length); + assertEquals(3, group.getGroupedSplits().size()); + String matchedLoc = group.getLocations()[0]; + MutableInt count = matchedLocations.get(matchedLoc); + if (count == null) { + count = new MutableInt(0); + matchedLocations.put(matchedLoc, count); + } + count.increment(); + } + for (Map.Entry<String, MutableInt> entry : matchedLocations.entrySet()) { + String loc = entry.getKey(); + int nodeId = Character.getNumericValue(loc.charAt(loc.length() - 1)); + assertTrue(nodeId < 4); + assertTrue(loc.startsWith("node") && loc.length() == 5); + assertEquals(2, entry.getValue().getValue()); } }
