TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly (gopalv)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/26518d5d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/26518d5d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/26518d5d Branch: refs/heads/TEZ-2003 Commit: 26518d5dae1b04b3ea48b0518abd6b2312e279b9 Parents: 5e2a55f Author: Gopal V <[email protected]> Authored: Mon Apr 6 13:58:21 2015 -0700 Committer: Gopal V <[email protected]> Committed: Mon Apr 6 13:58:21 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../hadoop/mapred/split/SplitSizeEstimator.java | 29 ++++++++ .../split/TezGroupedSplitsInputFormat.java | 13 +++- .../mapred/split/TezMapredSplitsGrouper.java | 30 +++++++-- .../mapreduce/split/SplitSizeEstimator.java | 27 ++++++++ .../split/TezGroupedSplitsInputFormat.java | 13 +++- .../split/TezMapReduceSplitsGrouper.java | 33 ++++++++-- .../hadoop/mapred/split/TestGroupedSplits.java | 69 ++++++++++++++++++++ 8 files changed, 202 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8fad569..ee0ef70 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,8 @@ Release 0.7.0: Unreleased INCOMPATIBLE CHANGES TEZ-2176. Move all logging to slf4j. (commons-logging jar no longer part of Tez tar) + TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly + ALL CHANGES: TEZ-2232. Allow setParallelism to be called multiple times before tasks get http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitSizeEstimator.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitSizeEstimator.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitSizeEstimator.java new file mode 100644 index 0000000..a4c0e73 --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitSizeEstimator.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.split; + +import java.io.IOException; + +import org.apache.hadoop.mapred.InputSplit; + +/* the two big differences between the mapred.* and mapreduce.* split classes + * is that mapreduce throws InterruptedExceptions */ +public interface SplitSizeEstimator { + long getEstimatedSize(InputSplit split) throws IOException; +} http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java index ddfb856..707f9ad 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.split.SplitSizeEstimator; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.TezUncheckedException; @@ -50,6 +51,8 @@ public class TezGroupedSplitsInputFormat<K, V> InputFormat<K, V> wrappedInputFormat; int desiredNumSplits = 0; Configuration conf; + + SplitSizeEstimator estimator; public TezGroupedSplitsInputFormat() { @@ -61,6 +64,14 @@ public class TezGroupedSplitsInputFormat<K, V> LOG.debug("wrappedInputFormat: " + wrappedInputFormat.getClass().getName()); } } + + public void setSplitSizeEstimator(SplitSizeEstimator estimator) { + Preconditions.checkArgument(estimator != null); + this.estimator = estimator; + if (LOG.isDebugEnabled()) { + LOG.debug("Split size estimator : " + estimator); + } + } public void setDesiredNumberOfSplits(int num) { Preconditions.checkArgument(num >= 0); @@ -75,7 +86,7 @@ public class TezGroupedSplitsInputFormat<K, V> InputSplit[] originalSplits = wrappedInputFormat.getSplits(job, numSplits); TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper(); String wrappedInputFormatName = wrappedInputFormat.getClass().getName(); - return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName); + return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java index 4ef50fd..29b5e1e 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java @@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.split.SplitSizeEstimator; import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; import org.apache.hadoop.yarn.util.RackResolver; import org.apache.tez.dag.api.TezUncheckedException; @@ -81,6 +82,15 @@ public class TezMapredSplitsGrouper { } } + private static final SplitSizeEstimator DEFAULT_SPLIT_ESTIMATOR = new DefaultSplitSizeEstimator(); + + static final class DefaultSplitSizeEstimator implements SplitSizeEstimator { + @Override + public long getEstimatedSize(InputSplit split) throws IOException { + return split.getLength(); + } + } + Map<String, LocationHolder> createLocationsMap(Configuration conf) { if (conf.getBoolean(TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE, TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT)) { @@ -92,6 +102,12 @@ public class TezMapredSplitsGrouper { public InputSplit[] getGroupedSplits(Configuration conf, InputSplit[] originalSplits, int desiredNumSplits, String wrappedInputFormatName) throws IOException { + return getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, null); + } + + public InputSplit[] getGroupedSplits(Configuration conf, + InputSplit[] originalSplits, int desiredNumSplits, + String wrappedInputFormatName, SplitSizeEstimator estimator) throws IOException { LOG.info("Grouping splits in Tez"); int configNumSplits = conf.getInt(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_COUNT, 0); @@ -100,7 +116,11 @@ public class TezMapredSplitsGrouper { desiredNumSplits = configNumSplits; LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits); } - + + if (estimator == null) { + estimator = DEFAULT_SPLIT_ESTIMATOR; + } + if (! (configNumSplits > 0 || originalSplits == null || originalSplits.length == 0) ) { @@ -110,7 +130,7 @@ public class TezMapredSplitsGrouper { // Do sanity checks long totalLength = 0; for (InputSplit split : originalSplits) { - totalLength += split.getLength(); + totalLength += estimator.getEstimatedSize(split); } int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.length; @@ -183,7 +203,7 @@ public class TezMapredSplitsGrouper { Map<String, LocationHolder> distinctLocations = createLocationsMap(conf); // go through splits and add them to locations for (InputSplit split : originalSplits) { - totalLength += split.getLength(); + totalLength += estimator.getEstimatedSize(split); String[] locations = split.getLocations(); if (locations == null || locations.length == 0) { locations = emptyLocations; @@ -272,13 +292,13 @@ public class TezMapredSplitsGrouper { int groupNumSplits = 0; do { group.add(splitHolder); - groupLength += splitHolder.split.getLength(); + groupLength += estimator.getEstimatedSize(splitHolder.split); groupNumSplits++; holder.incrementHeadIndex(); splitHolder = holder.getUnprocessedHeadSplit(); } while(splitHolder != null && (!groupByLength || - (groupLength + splitHolder.split.getLength() <= lengthPerGroup)) + (groupLength + estimator.getEstimatedSize(splitHolder.split) <= lengthPerGroup)) && (!groupByCount || (groupNumSplits + 1 <= numSplitsInGroup))); http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitSizeEstimator.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitSizeEstimator.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitSizeEstimator.java new file mode 100644 index 0000000..2f8d74f --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitSizeEstimator.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.split; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.InputSplit; + +public interface SplitSizeEstimator { + long getEstimatedSize(InputSplit split) throws InterruptedException, IOException; +} http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java index f5999b3..519b52a 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java @@ -52,6 +52,7 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V> InputFormat<K, V> wrappedInputFormat; int desiredNumSplits = 0; Configuration conf; + SplitSizeEstimator estimator; public TezGroupedSplitsInputFormat() { @@ -71,7 +72,15 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V> LOG.debug("desiredNumSplits: " + desiredNumSplits); } } - + + public void setSplitSizeEstimator(SplitSizeEstimator estimator) { + Preconditions.checkArgument(estimator != null); + this.estimator = estimator; + if (LOG.isDebugEnabled()) { + LOG.debug("Split size estimator : " + estimator); + } + } + class SplitHolder { InputSplit split; boolean isProcessed = false; @@ -110,7 +119,7 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V> List<InputSplit> originalSplits = wrappedInputFormat.getSplits(context); TezMapReduceSplitsGrouper grouper = new TezMapReduceSplitsGrouper(); String wrappedInputFormatName = wrappedInputFormat.getClass().getName(); - return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName); + return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java index 6caeba4..88b9845 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java @@ -139,7 +139,17 @@ public class TezMapReduceSplitsGrouper { headIndex++; } } - + + private static final SplitSizeEstimator DEFAULT_SPLIT_ESTIMATOR = new DefaultSplitSizeEstimator(); + + static final class DefaultSplitSizeEstimator implements SplitSizeEstimator { + @Override + public long getEstimatedSize(InputSplit split) throws InterruptedException, + IOException { + return split.getLength(); + } + } + Map<String, LocationHolder> createLocationsMap(Configuration conf) { if (conf.getBoolean(TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE, TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT)) { @@ -151,6 +161,13 @@ public class TezMapReduceSplitsGrouper { public List<InputSplit> getGroupedSplits(Configuration conf, List<InputSplit> originalSplits, int desiredNumSplits, String wrappedInputFormatName) throws IOException, InterruptedException { + return getGroupedSplits(conf, originalSplits, desiredNumSplits, + wrappedInputFormatName, null); + } + + public List<InputSplit> getGroupedSplits(Configuration conf, + List<InputSplit> originalSplits, int desiredNumSplits, + String wrappedInputFormatName, SplitSizeEstimator estimator) throws IOException, InterruptedException { LOG.info("Grouping splits in Tez"); int configNumSplits = conf.getInt(TEZ_GROUPING_SPLIT_COUNT, 0); @@ -159,7 +176,11 @@ public class TezMapReduceSplitsGrouper { desiredNumSplits = configNumSplits; LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits); } - + + if (estimator == null) { + estimator = DEFAULT_SPLIT_ESTIMATOR; + } + if (! (configNumSplits > 0 || originalSplits == null || originalSplits.size() == 0)) { @@ -170,7 +191,7 @@ public class TezMapReduceSplitsGrouper { // Do sanity checks long totalLength = 0; for (InputSplit split : originalSplits) { - totalLength += split.getLength(); + totalLength += estimator.getEstimatedSize(split); } int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.size(); @@ -239,7 +260,7 @@ public class TezMapReduceSplitsGrouper { Map<String, LocationHolder> distinctLocations = createLocationsMap(conf); // go through splits and add them to locations for (InputSplit split : originalSplits) { - totalLength += split.getLength(); + totalLength += estimator.getEstimatedSize(split); String[] locations = split.getLocations(); if (locations == null || locations.length == 0) { locations = emptyLocations; @@ -328,13 +349,13 @@ public class TezMapReduceSplitsGrouper { int groupNumSplits = 0; do { group.add(splitHolder); - groupLength += splitHolder.split.getLength(); + groupLength += estimator.getEstimatedSize(splitHolder.split); groupNumSplits++; holder.incrementHeadIndex(); splitHolder = holder.getUnprocessedHeadSplit(); } while(splitHolder != null && (!groupByLength || - (groupLength + splitHolder.split.getLength() <= lengthPerGroup)) + (groupLength + estimator.getEstimatedSize(splitHolder.split) <= lengthPerGroup)) && (!groupByCount || (groupNumSplits + 1 <= numSplitsInGroup))); http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java index 689ea2c..13b69c8 100644 --- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java +++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java @@ -47,6 +47,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.split.SplitSizeEstimator; import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; import org.apache.hadoop.util.ReflectionUtils; import org.apache.tez.common.MockDNSToSwitchMapping; @@ -547,4 +548,72 @@ public class TestGroupedSplits { split.write(new DataOutputStream(bOut)); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test(timeout=10000) + public void testGroupedSplitWithEstimator() throws IOException { + JobConf job = new JobConf(defaultConf); + + job = (JobConf) TezMapReduceSplitsGrouper.createConfigBuilder(job) + .setGroupingSplitSize(12*1000*1000l, 25*1000*1000l) + .build(); + + InputFormat mockWrappedFormat = mock(InputFormat.class); + TezGroupedSplitsInputFormat<LongWritable , Text> format = + new TezGroupedSplitsInputFormat<LongWritable, Text>(); + format.setConf(job); + format.setInputFormat(mockWrappedFormat); + + final InputSplit mockSplit1 = mock(InputSplit.class); + final InputSplit mockSplit2 = mock(InputSplit.class); + final InputSplit mockSplit3 = mock(InputSplit.class); + + final String[] locations = new String[] { "common", "common", "common" }; + + final SplitSizeEstimator estimator = new SplitSizeEstimator() { + + @Override + public long getEstimatedSize(InputSplit split) throws IOException { + LOG.info("Estimating 10x of " + split.getLength()); + // 10x compression + return 10 * split.getLength(); + } + }; + + when(mockSplit1.getLength()).thenReturn(1000 * 1000l); + when(mockSplit1.getLocations()).thenReturn(locations); + + when(mockSplit2.getLength()).thenReturn(1000 * 1000l); + when(mockSplit2.getLocations()).thenReturn(locations); + + when(mockSplit3.getLength()).thenReturn(2 * 1000 * 1000l + 1); + when(mockSplit3.getLocations()).thenReturn(locations); + + // put multiple splits which should be grouped (1,1,2) Mb, but estimated to be 10x + // 10,10,20Mb - grouped with min=12Mb, max=25Mb + // should be grouped as (1,1),(2) + InputSplit[] mockSplits = new InputSplit[] { mockSplit1, mockSplit2, + mockSplit3 }; + + when(mockWrappedFormat.getSplits((JobConf) anyObject(), anyInt())) + .thenReturn(mockSplits); + + format.setDesiredNumberOfSplits(1); + format.setSplitSizeEstimator(estimator); + + InputSplit[] splits = format.getSplits(job, 1); + // due to the min = 12Mb + Assert.assertEquals(2, splits.length); + + for (InputSplit group : splits) { + TezGroupedSplit split = (TezGroupedSplit) group; + if (split.wrappedSplits.size() == 2) { + // split1+split2 + Assert.assertEquals(split.getLength(), 2 * 1000 * 1000l); + } else { + // split3 + Assert.assertEquals(split.getLength(), 2 * 1000 * 1000l + 1); + } + } + } + }
