Repository: tez Updated Branches: refs/heads/master a2b6eac1c -> 125f8c023
TEZ-3552. Shuffle split array when size-based sorting is turned off. (Zhiyuan Yang via mingma) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/125f8c02 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/125f8c02 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/125f8c02 Branch: refs/heads/master Commit: 125f8c0239a318f4a025b2df9cbfaea16fb49e1a Parents: a2b6eac Author: Ming Ma <[email protected]> Authored: Wed Dec 7 17:25:42 2016 -0800 Committer: Ming Ma <[email protected]> Committed: Wed Dec 7 17:25:42 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/mapreduce/hadoop/MRInputHelpers.java | 2 + .../common/TestMRInputAMSplitGenerator.java | 58 ++++++++++++-------- 3 files changed, 38 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/125f8c02/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b15686e..a7299b7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3552. Shuffle split array when size-based sorting is turned off. TEZ-3537. ArrayIndexOutOfBoundsException with empty environment variables/Port YARN-3768 to Tez TEZ-3271. Provide mapreduce failures.maxpercent equivalent. TEZ-3222. Reduce messaging overhead for auto-reduce parallelism case. http://git-wip-us.apache.org/repos/asf/tez/blob/125f8c02/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java index b0a76fa..2f3d7ce 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java @@ -453,6 +453,8 @@ public class MRInputHelpers { // sort the splits into order based on size, so that the biggest // go first Arrays.sort(splits, new InputSplitComparator()); + } else { + Collections.shuffle(Arrays.asList(splits)); } return splits; } http://git-wip-us.apache.org/repos/asf/tez/blob/125f8c02/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java index bd4e5a9..6cf2700 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java @@ -85,7 +85,10 @@ public class TestMRInputAMSplitGenerator { private void testGroupSplitsAndSortSplits(boolean groupSplitsEnabled, boolean sortSplitsEnabled) throws Exception { Configuration conf = new Configuration(); - String[] splitLengths = new String[] {"1000", "2000", "3000"}; + String[] splitLengths = new String[50]; + for (int i = 0; i < splitLengths.length; i++) { + splitLengths[i] = Integer.toString(1000 * (i + 1)); + } conf.setStrings(SPLITS_LENGTHS, splitLengths); DataSourceDescriptor dataSource = MRInput.createConfigBuilder( conf, InputFormatForTest.class). @@ -99,39 +102,48 @@ public class TestMRInputAMSplitGenerator { List<Event> events = splitGenerator.initialize(); - assertEquals(splitLengths.length + 1, events.size()); assertTrue(events.get(0) instanceof InputConfigureVertexTasksEvent); - for (int i = 1; i < splitLengths.length + 1; i++) { + boolean shuffled = false; + InputSplit previousIs = null; + int numRawInputSplits = 0; + for (int i = 1; i < events.size(); i++) { assertTrue(events.get(i) instanceof InputDataInformationEvent); InputDataInformationEvent diEvent = (InputDataInformationEvent) (events.get(i)); assertNull(diEvent.getDeserializedUserPayload()); assertNotNull(diEvent.getUserPayload()); MRSplitProto eventProto = MRSplitProto.parseFrom(ByteString.copyFrom( diEvent.getUserPayload())); - InputSplit is = MRInputUtils.getNewSplitDetailsFromEvent(eventProto, new Configuration()); + InputSplit is = MRInputUtils.getNewSplitDetailsFromEvent( + eventProto, new Configuration()); if (groupSplitsEnabled) { - // For this configuration, there is no actual split grouping. - is = ((TezGroupedSplit)is).getGroupedSplits().get(0); + numRawInputSplits += ((TezGroupedSplit)is).getGroupedSplits().size(); + for (InputSplit inputSplit : ((TezGroupedSplit)is).getGroupedSplits()) { + assertTrue(inputSplit instanceof InputSplitForTest); + } + assertTrue(((TezGroupedSplit)is).getGroupedSplits().get(0) + instanceof InputSplitForTest); + } else { + numRawInputSplits++; + assertTrue(is instanceof InputSplitForTest); } - assertTrue(is instanceof InputSplitForTest); // The splits in the list returned from InputFormat has ascending - // size in order. MRInputAMSplitGenerator might sort the list - // from InputFormat depending on sortSplitsEnabled. - if (i == 1) { - // The first split returned from MRInputAMSplitGenerator. - // When sort split is enabled, the first split returned from - // MRInputAMSplitGenerator is the last split in the list returned - // from InputFormat. - assertEquals(sortSplitsEnabled ? splitLengths.length : 1, - ((InputSplitForTest) is).getIdentifier()); - } else if (i == splitLengths.length) { - // The last split returned from MRInputAMSplitGenerator - // When sort split is enabled, the last split returned from - // MRInputAMSplitGenerator is the first split in the list returned - // from InputFormat. - assertEquals(sortSplitsEnabled ? 1 : splitLengths.length, - ((InputSplitForTest) is).getIdentifier()); + // size in order. + // If sortSplitsEnabled is true, MRInputAMSplitGenerator will sort the + // splits in descending order. + // If sortSplitsEnabled is false, MRInputAMSplitGenerator will shuffle + // the splits. + if (previousIs != null) { + if (sortSplitsEnabled) { + assertTrue(is.getLength() <= previousIs.getLength()); + } else { + shuffled |= (is.getLength() > previousIs.getLength()); + } } + previousIs = is; + } + assertEquals(splitLengths.length, numRawInputSplits); + if (!sortSplitsEnabled) { + assertTrue(shuffled); } }
