TEZ-1692. Reduce code duplication between TezMapredSplitsGrouper and TezMapreduceSplitsGrouper. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6632903b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6632903b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6632903b Branch: refs/heads/master Commit: 6632903bb3cb70b4717b64f3f78664f34812ec5b Parents: c89e352 Author: Siddharth Seth <[email protected]> Authored: Wed Oct 14 17:42:35 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Wed Oct 14 17:42:35 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + tez-mapreduce/findbugs-exclude.xml | 14 + .../hadoop/mapred/split/TezGroupedSplit.java | 11 +- .../mapred/split/TezMapredSplitsGrouper.java | 459 ++------------- .../hadoop/mapreduce/split/TezGroupedSplit.java | 19 +- .../split/TezMapReduceSplitsGrouper.java | 573 ++++--------------- .../common/MRInputAMSplitGenerator.java | 5 +- .../grouper/GroupedSplitContainer.java | 74 +++ .../grouper/MapReduceSplitContainer.java | 64 +++ .../mapreduce/grouper/MapredSplitContainer.java | 64 +++ .../tez/mapreduce/grouper/SplitContainer.java | 41 ++ .../grouper/SplitSizeEstimatorWrapper.java | 30 + .../SplitSizeEstimatorWrapperMapReduce.java | 35 ++ .../SplitSizeEstimatorWrapperMapred.java | 35 ++ .../tez/mapreduce/grouper/TezSplitGrouper.java | 571 ++++++++++++++++++ .../hadoop/mapred/split/TestGroupedSplits.java | 11 +- 16 files changed, 1120 insertions(+), 887 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 17735b5..7c2f030 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: TEZ-2887. Tez build failure due to missing dependency in pom files. + TEZ-1692. Reduce code duplication between TezMapredSplitsGrouper and TezMapreduceSplitsGrouper. Release 0.8.1-alpha: 2015-10-12 http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-mapreduce/findbugs-exclude.xml b/tez-mapreduce/findbugs-exclude.xml index 873d4a2..ec64739 100644 --- a/tez-mapreduce/findbugs-exclude.xml +++ b/tez-mapreduce/findbugs-exclude.xml @@ -70,6 +70,13 @@ </Match> <Match> + <Class name="org.apache.tez.mapreduce.grouper.GroupedSplitContainer"/> + <Method name="getLocations"/> + <Field name="locations"/> + <Bug pattern="EI_EXPOSE_REP"/> + </Match> + + <Match> <Class name="org.apache.tez.mapreduce.hadoop.InputSplitInfoMem"/> <Method name="getNewFormatSplits"/> <Field name="newFormatSplits"/> @@ -98,6 +105,13 @@ </Match> <Match> + <Class name="org.apache.tez.mapreduce.grouper.GroupedSplitContainer"/> + <Method name="<init>"/> + <Field name="locations"/> + <Bug pattern="EI_EXPOSE_REP2"/> + </Match> + + <Match> <Class name="org.apache.tez.mapreduce.hadoop.InputSplitInfoMem"/> <Method name="<init>"/> <Field name="oldFormatSplits"/> http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java index a9893aa..bc58043 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java @@ -53,7 +53,16 @@ public class TezGroupedSplit implements InputSplit, Configurable { public TezGroupedSplit() { } - + + public TezGroupedSplit(List<InputSplit> wrappedSplits, String wrappedInputFormatName, + String[] locations, String rack, long length) { + this.wrappedSplits = wrappedSplits; + this.wrappedInputFormatName = wrappedInputFormatName; + this.locations = locations; + this.rack = rack; + this.length = length; + } + public TezGroupedSplit(int numSplits, String wrappedInputFormatName, String[] locations, String rack) { this.wrappedSplits = new ArrayList<InputSplit>(numSplits); http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java index 2194551..f2a8a0c 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java @@ -19,25 +19,23 @@ package org.apache.hadoop.mapred.split; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; +import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.tez.mapreduce.grouper.GroupedSplitContainer; +import org.apache.tez.mapreduce.grouper.MapredSplitContainer; +import org.apache.tez.mapreduce.grouper.SplitContainer; +import org.apache.tez.mapreduce.grouper.SplitSizeEstimatorWrapperMapred; +import org.apache.tez.mapreduce.grouper.TezSplitGrouper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; -import org.apache.hadoop.yarn.util.RackResolver; -import org.apache.tez.dag.api.TezUncheckedException; - -import com.google.common.base.Preconditions; /** * A Helper that provides grouping logic to group InputSplits @@ -46,58 +44,9 @@ import com.google.common.base.Preconditions; */ @Public @Evolving -public class TezMapredSplitsGrouper { +public class TezMapredSplitsGrouper extends TezSplitGrouper { private static final Logger LOG = LoggerFactory.getLogger(TezMapredSplitsGrouper.class); - static class SplitHolder { - InputSplit split; - boolean isProcessed = false; - SplitHolder(InputSplit split) { - this.split = split; - } - } - - static class LocationHolder { - List<SplitHolder> splits; - int headIndex = 0; - LocationHolder(int capacity) { - splits = new ArrayList<SplitHolder>(capacity); - } - boolean isEmpty() { - return (headIndex == splits.size()); - } - SplitHolder getUnprocessedHeadSplit() { - while (!isEmpty()) { - SplitHolder holder = splits.get(headIndex); - if (!holder.isProcessed) { - return holder; - } - incrementHeadIndex(); - } - return null; - } - void incrementHeadIndex() { - headIndex++; - } - } - - private static final SplitSizeEstimator DEFAULT_SPLIT_ESTIMATOR = new DefaultSplitSizeEstimator(); - - static final class DefaultSplitSizeEstimator implements SplitSizeEstimator { - @Override - public long getEstimatedSize(InputSplit split) throws IOException { - return split.getLength(); - } - } - - Map<String, LocationHolder> createLocationsMap(Configuration conf) { - if (conf.getBoolean(TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE, - TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT)) { - return new TreeMap<String, LocationHolder>(); - } - return new HashMap<String, LocationHolder>(); - } - public InputSplit[] getGroupedSplits(Configuration conf, InputSplit[] originalSplits, int desiredNumSplits, String wrappedInputFormatName) throws IOException { @@ -107,367 +56,41 @@ public class TezMapredSplitsGrouper { public InputSplit[] getGroupedSplits(Configuration conf, InputSplit[] originalSplits, int desiredNumSplits, String wrappedInputFormatName, SplitSizeEstimator estimator) throws IOException { - LOG.info("Grouping splits in Tez"); Preconditions.checkArgument(originalSplits != null, "Splits must be specified"); - int configNumSplits = conf.getInt(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_COUNT, 0); - if (configNumSplits > 0) { - // always use config override if specified - desiredNumSplits = configNumSplits; - LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits); - } - - if (estimator == null) { - estimator = DEFAULT_SPLIT_ESTIMATOR; - } - - if (! (configNumSplits > 0 || - originalSplits.length == 0) ) { - // numSplits has not been overridden by config - // numSplits has been set at runtime - // there are splits generated - // Do sanity checks - long totalLength = 0; - for (InputSplit split : originalSplits) { - totalLength += estimator.getEstimatedSize(split); - } - - int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.length; - long lengthPerGroup = totalLength/splitCount; - - long maxLengthPerGroup = conf.getLong( - TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE, - TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT); - long minLengthPerGroup = conf.getLong( - TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE, - TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT); - if (maxLengthPerGroup < minLengthPerGroup || - minLengthPerGroup <=0) { - throw new TezUncheckedException( - "Invalid max/min group lengths. Required min>0, max>=min. " + - " max: " + maxLengthPerGroup + " min: " + minLengthPerGroup); - } - if (lengthPerGroup > maxLengthPerGroup) { - // splits too big to work. Need to override with max size. - int newDesiredNumSplits = (int)(totalLength/maxLengthPerGroup) + 1; - LOG.info("Desired splits: " + desiredNumSplits + " too small. " + - " Desired splitLength: " + lengthPerGroup + - " Max splitLength: " + maxLengthPerGroup + - " New desired splits: " + newDesiredNumSplits + - " Total length: " + totalLength + - " Original splits: " + originalSplits.length); - - desiredNumSplits = newDesiredNumSplits; - } else if (lengthPerGroup < minLengthPerGroup) { - // splits too small to work. Need to override with size. - int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1; - LOG.info("Desired splits: " + desiredNumSplits + " too large. " + - " Desired splitLength: " + lengthPerGroup + - " Min splitLength: " + minLengthPerGroup + - " New desired splits: " + newDesiredNumSplits + - " Total length: " + totalLength + - " Original splits: " + originalSplits.length); - - desiredNumSplits = newDesiredNumSplits; - } - } - - if (originalSplits == null) { - LOG.info("Null original splits"); - return null; - } - - if (desiredNumSplits == 0 || - originalSplits.length == 0 || - desiredNumSplits >= originalSplits.length) { - // nothing set. so return all the splits as is - LOG.info("Using original number of splits: " + originalSplits.length + - " desired splits: " + desiredNumSplits); - InputSplit[] groupedSplits = new TezGroupedSplit[originalSplits.length]; - int i=0; - for (InputSplit split : originalSplits) { - TezGroupedSplit newSplit = - new TezGroupedSplit(1, wrappedInputFormatName, split.getLocations()); - newSplit.addSplit(split); - groupedSplits[i++] = newSplit; - } - return groupedSplits; - } - - String emptyLocation = "EmptyLocation"; - String[] emptyLocations = {emptyLocation}; - List<InputSplit> groupedSplitsList = new ArrayList<InputSplit>(desiredNumSplits); - - long totalLength = 0; - Map<String, LocationHolder> distinctLocations = createLocationsMap(conf); - // go through splits and add them to locations - for (InputSplit split : originalSplits) { - totalLength += estimator.getEstimatedSize(split); - String[] locations = split.getLocations(); - if (locations == null || locations.length == 0) { - locations = emptyLocations; - } - for (String location : locations ) { - if (location == null) { - location = emptyLocation; - } - distinctLocations.put(location, null); - } - } - - long lengthPerGroup = totalLength/desiredNumSplits; - int numNodeLocations = distinctLocations.size(); - int numSplitsPerLocation = originalSplits.length/numNodeLocations; - int numSplitsInGroup = originalSplits.length/desiredNumSplits; - - // allocation loop here so that we have a good initial size for the lists - for (String location : distinctLocations.keySet()) { - distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1)); - } - - Set<String> locSet = new HashSet<String>(); - for (InputSplit split : originalSplits) { - locSet.clear(); - SplitHolder splitHolder = new SplitHolder(split); - String[] locations = split.getLocations(); - if (locations == null || locations.length == 0) { - locations = emptyLocations; - } - for (String location : locations) { - if (location == null) { - location = emptyLocation; - } - locSet.add(location); - } - for (String location : locSet) { - LocationHolder holder = distinctLocations.get(location); - holder.splits.add(splitHolder); - } - } - - boolean groupByLength = conf.getBoolean( - TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH, - TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT); - boolean groupByCount = conf.getBoolean( - TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_COUNT, - TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT); - if (!(groupByLength || groupByCount)) { - throw new TezUncheckedException( - "None of the grouping parameters are true: " - + TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH + ", " - + TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_COUNT); - } - LOG.info("Desired numSplits: " + desiredNumSplits + - " lengthPerGroup: " + lengthPerGroup + - " numLocations: " + numNodeLocations + - " numSplitsPerLocation: " + numSplitsPerLocation + - " numSplitsInGroup: " + numSplitsInGroup + - " totalLength: " + totalLength + - " numOriginalSplits: " + originalSplits.length + - " . Grouping by length: " + groupByLength + " count: " + groupByCount); - - // go through locations and group splits - int splitsProcessed = 0; - List<SplitHolder> group = new ArrayList<SplitHolder>(numSplitsInGroup+1); - Set<String> groupLocationSet = new HashSet<String>(10); - boolean allowSmallGroups = false; - boolean doingRackLocal = false; - int iterations = 0; - while (splitsProcessed < originalSplits.length) { - iterations++; - int numFullGroupsCreated = 0; - for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) { - group.clear(); - groupLocationSet.clear(); - String location = entry.getKey(); - LocationHolder holder = entry.getValue(); - SplitHolder splitHolder = holder.getUnprocessedHeadSplit(); - if (splitHolder == null) { - // all splits on node processed - continue; - } - int oldHeadIndex = holder.headIndex; - long groupLength = 0; - int groupNumSplits = 0; - do { - group.add(splitHolder); - groupLength += estimator.getEstimatedSize(splitHolder.split); - groupNumSplits++; - holder.incrementHeadIndex(); - splitHolder = holder.getUnprocessedHeadSplit(); - } while(splitHolder != null - && (!groupByLength || - (groupLength + estimator.getEstimatedSize(splitHolder.split) <= lengthPerGroup)) - && (!groupByCount || - (groupNumSplits + 1 <= numSplitsInGroup))); - - if (holder.isEmpty() - && !allowSmallGroups - && (!groupByLength || groupLength < lengthPerGroup/2) - && (!groupByCount || groupNumSplits < numSplitsInGroup/2)) { - // group too small, reset it - holder.headIndex = oldHeadIndex; - continue; - } - - numFullGroupsCreated++; - - // One split group created - String[] groupLocation = {location}; - if (location == emptyLocation) { - groupLocation = null; - } else if (doingRackLocal) { - for (SplitHolder splitH : group) { - String[] locations = splitH.split.getLocations(); - if (locations != null) { - for (String loc : locations) { - if (loc != null) { - groupLocationSet.add(loc); - } - } - } - } - groupLocation = groupLocationSet.toArray(groupLocation); - } - TezGroupedSplit groupedSplit = - new TezGroupedSplit(group.size(), wrappedInputFormatName, - groupLocation, - // pass rack local hint directly to AM - ((doingRackLocal && location != emptyLocation)?location:null)); - for (SplitHolder groupedSplitHolder : group) { - groupedSplit.addSplit(groupedSplitHolder.split); - Preconditions.checkState(groupedSplitHolder.isProcessed == false, - "Duplicates in grouping at location: " + location); - groupedSplitHolder.isProcessed = true; - splitsProcessed++; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Grouped " + group.size() - + " length: " + groupedSplit.getLength() - + " split at: " + location); - } - groupedSplitsList.add(groupedSplit); - } - - if (!doingRackLocal && numFullGroupsCreated < 1) { - // no node could create a node-local group. go rack-local - doingRackLocal = true; - // re-create locations - int numRemainingSplits = originalSplits.length - splitsProcessed; - Set<InputSplit> remainingSplits = new HashSet<InputSplit>(numRemainingSplits); - // gather remaining splits. - for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) { - LocationHolder locHolder = entry.getValue(); - while (!locHolder.isEmpty()) { - SplitHolder splitHolder = locHolder.getUnprocessedHeadSplit(); - if (splitHolder != null) { - remainingSplits.add(splitHolder.split); - locHolder.incrementHeadIndex(); - } - } - } - if (remainingSplits.size() != numRemainingSplits) { - throw new TezUncheckedException("Expected: " + numRemainingSplits - + " got: " + remainingSplits.size()); - } - - // doing all this now instead of up front because the number of remaining - // splits is expected to be much smaller - RackResolver.init(conf); - Map<String, String> locToRackMap = new HashMap<String, String>(distinctLocations.size()); - Map<String, LocationHolder> rackLocations = createLocationsMap(conf); - for (String location : distinctLocations.keySet()) { - String rack = emptyLocation; - if (location != emptyLocation) { - rack = RackResolver.resolve(location).getNetworkLocation(); - } - locToRackMap.put(location, rack); - if (rackLocations.get(rack) == null) { - // splits will probably be located in all racks - rackLocations.put(rack, new LocationHolder(numRemainingSplits)); - } - } - distinctLocations.clear(); - HashSet<String> rackSet = new HashSet<String>(rackLocations.size()); - int numRackSplitsToGroup = remainingSplits.size(); - for (InputSplit split : originalSplits) { - if (numRackSplitsToGroup == 0) { - break; - } - // Iterate through the original splits in their order and consider them for grouping. - // This maintains the original ordering in the list and thus subsequent grouping will - // maintain that order - if (!remainingSplits.contains(split)) { - continue; - } - numRackSplitsToGroup--; - rackSet.clear(); - SplitHolder splitHolder = new SplitHolder(split); - String[] locations = split.getLocations(); - if (locations == null || locations.length == 0) { - locations = emptyLocations; + List<SplitContainer> originalSplitContainers = Lists.transform(Arrays.asList(originalSplits), + new Function<InputSplit, SplitContainer>() { + @Override + public SplitContainer apply(InputSplit input) { + return new MapredSplitContainer(input); } - for (String location : locations ) { - if ( location == null) { - location = emptyLocation; + }); + + try { + List<InputSplit> resultList = Lists.transform(super + .getGroupedSplits(conf, originalSplitContainers, desiredNumSplits, + wrappedInputFormatName, estimator == null ? null : + new SplitSizeEstimatorWrapperMapred(estimator)), + new Function<GroupedSplitContainer, InputSplit>() { + @Override + public InputSplit apply(GroupedSplitContainer input) { + List<InputSplit> underlyingSplits = Lists.transform(input.getWrappedSplitContainers(), + new Function<SplitContainer, InputSplit>() { + @Override + public InputSplit apply(SplitContainer input) { + return ((MapredSplitContainer) input).getRawSplit(); + } + }); + + + return new TezGroupedSplit(underlyingSplits, input.getWrappedInputFormatName(), + input.getLocations(), input.getRack(), input.getLength()); } - rackSet.add(locToRackMap.get(location)); - } - for (String rack : rackSet) { - rackLocations.get(rack).splits.add(splitHolder); - } - } - remainingSplits.clear(); - distinctLocations = rackLocations; - // adjust split length to be smaller because the data is non local - float rackSplitReduction = conf.getFloat( - TezMapReduceSplitsGrouper.TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION, - TezMapReduceSplitsGrouper.TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT); - if (rackSplitReduction > 0) { - long newLengthPerGroup = (long)(lengthPerGroup*rackSplitReduction); - int newNumSplitsInGroup = (int) (numSplitsInGroup*rackSplitReduction); - if (newLengthPerGroup > 0) { - lengthPerGroup = newLengthPerGroup; - } - if (newNumSplitsInGroup > 0) { - numSplitsInGroup = newNumSplitsInGroup; - } - } - - LOG.info("Doing rack local after iteration: " + iterations + - " splitsProcessed: " + splitsProcessed + - " numFullGroupsInRound: " + numFullGroupsCreated + - " totalGroups: " + groupedSplitsList.size() + - " lengthPerGroup: " + lengthPerGroup + - " numSplitsInGroup: " + numSplitsInGroup); - - // dont do smallGroups for the first pass - continue; - } - - if (!allowSmallGroups && numFullGroupsCreated <= numNodeLocations/10) { - // a few nodes have a lot of data or data is thinly spread across nodes - // so allow small groups now - allowSmallGroups = true; - LOG.info("Allowing small groups after iteration: " + iterations + - " splitsProcessed: " + splitsProcessed + - " numFullGroupsInRound: " + numFullGroupsCreated + - " totalGroups: " + groupedSplitsList.size()); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Iteration: " + iterations + - " splitsProcessed: " + splitsProcessed + - " numFullGroupsInRound: " + numFullGroupsCreated + - " totalGroups: " + groupedSplitsList.size()); - } + }); + InputSplit[] resultArr = resultList.toArray(new InputSplit[resultList.size()]); + return resultArr; + } catch (InterruptedException e) { + throw new IOException(e); } - InputSplit[] groupedSplits = new InputSplit[groupedSplitsList.size()]; - groupedSplitsList.toArray(groupedSplits); - LOG.info("Number of splits desired: " + desiredNumSplits + - " created: " + groupedSplitsList.size() + - " splitsProcessed: " + splitsProcessed); - return groupedSplits; } - } http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java index 430d2ec..2d198ad 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configurable; @@ -50,11 +51,25 @@ public class TezGroupedSplit extends InputSplit String rack = null; long length = 0; Configuration conf; - + + @InterfaceAudience.Private public TezGroupedSplit() { } - + + @InterfaceAudience.Private + /** + * Meant for internal usage only + */ + public TezGroupedSplit(List<InputSplit> wrappedSplits, String wrappedInputFormatName, + String[] locations, String rack, long length) { + this.wrappedSplits = wrappedSplits; + this.wrappedInputFormatName = wrappedInputFormatName; + this.locations = locations; + this.rack = rack; + this.length = length; + } + public TezGroupedSplit(int numSplits, String wrappedInputFormatName, String[] locations, String rack) { this.wrappedSplits = new ArrayList<InputSplit>(numSplits); http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java index 4be3931..87729bd 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java @@ -19,26 +19,24 @@ package org.apache.hadoop.mapreduce.split; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; import javax.annotation.Nullable; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.tez.mapreduce.grouper.GroupedSplitContainer; +import org.apache.tez.mapreduce.grouper.MapReduceSplitContainer; +import org.apache.tez.mapreduce.grouper.SplitContainer; +import org.apache.tez.mapreduce.grouper.SplitSizeEstimatorWrapperMapReduce; +import org.apache.tez.mapreduce.grouper.TezSplitGrouper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.yarn.util.RackResolver; -import org.apache.tez.dag.api.TezUncheckedException; - -import com.google.common.base.Preconditions; /** * Helper that provides a grouping of input splits based @@ -47,117 +45,94 @@ import com.google.common.base.Preconditions; */ @Public @Evolving -public class TezMapReduceSplitsGrouper { +public class TezMapReduceSplitsGrouper extends TezSplitGrouper { private static final Logger LOG = LoggerFactory.getLogger(TezMapReduceSplitsGrouper.class); /** - * Specify the number of splits desired to be created + * @deprecated See equivalent in {@link TezSplitGrouper} */ - public static final String TEZ_GROUPING_SPLIT_COUNT = "tez.grouping.split-count"; + @Deprecated + public static final String TEZ_GROUPING_SPLIT_COUNT = TezSplitGrouper.TEZ_GROUPING_SPLIT_COUNT; + /** - * Limit the number of splits in a group by the total length of the splits in the group + * @deprecated See equivalent in {@link TezSplitGrouper} */ - public static final String TEZ_GROUPING_SPLIT_BY_LENGTH = "tez.grouping.by-length"; - public static final boolean TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT = true; + @Deprecated + public static final String TEZ_GROUPING_SPLIT_BY_LENGTH = TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH; /** - * Limit the number of splits in a group by the number of splits in the group + * @deprecated See equivalent in {@link TezSplitGrouper} */ - public static final String TEZ_GROUPING_SPLIT_BY_COUNT = "tez.grouping.by-count"; - public static final boolean TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT = false; + @Deprecated + public static final boolean TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT = TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT; /** - * The multiplier for available queue capacity when determining number of - * tasks for a Vertex. 1.7 with 100% queue available implies generating a - * number of tasks roughly equal to 170% of the available containers on the - * queue. This enables multiple waves of mappers where the final wave is slightly smaller - * than the remaining waves. The gap helps overlap the final wave with any slower tasks - * from previous waves and tries to hide the delays from the slower tasks. Good values for - * this are 1.7, 2.7, 3.7 etc. Increase the number of waves to make the tasks smaller or - * shorter. + * @deprecated See equivalent in {@link TezSplitGrouper} */ - public static final String TEZ_GROUPING_SPLIT_WAVES = "tez.grouping.split-waves"; - public static final float TEZ_GROUPING_SPLIT_WAVES_DEFAULT = 1.7f; + @Deprecated + public static final String TEZ_GROUPING_SPLIT_BY_COUNT = TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_COUNT; + /** + * @deprecated See equivalent in {@link TezSplitGrouper} + */ + @Deprecated + public static final boolean TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT = TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT; /** - * Upper bound on the size (in bytes) of a grouped split, to avoid generating excessively large splits. + * @deprecated See equivalent in {@link TezSplitGrouper} + */ + @Deprecated + public static final String TEZ_GROUPING_SPLIT_WAVES = TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES; + /** + * @deprecated See equivalent in {@link TezSplitGrouper} */ - public static final String TEZ_GROUPING_SPLIT_MAX_SIZE = "tez.grouping.max-size"; - public static final long TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT = 1024*1024*1024L; + @Deprecated + public static final float TEZ_GROUPING_SPLIT_WAVES_DEFAULT = TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT; /** - * Lower bound on the size (in bytes) of a grouped split, to avoid generating too many small splits. + * @deprecated See equivalent in {@link TezSplitGrouper} + */ + @Deprecated + public static final String TEZ_GROUPING_SPLIT_MAX_SIZE = TezSplitGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE; + /** + * @deprecated See equivalent in {@link TezSplitGrouper} */ - public static final String TEZ_GROUPING_SPLIT_MIN_SIZE = "tez.grouping.min-size"; - public static final long TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT = 50*1024*1024L; + @Deprecated + public static final long TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT = TezSplitGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT; /** - * This factor is used to decrease the per group desired (length and count) limits for groups - * created by combining splits within a rack. Since reading this split involves reading data intra - * rack, the group is made smaller to cover up for the increased latencies in doing intra rack - * reads. The value should be a fraction <= 1. + * @deprecated See equivalent in {@link TezSplitGrouper} */ - public static final String TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION = - "tez.grouping.rack-split-reduction"; - public static final float TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT = 0.75f; - + @Deprecated + public static final String TEZ_GROUPING_SPLIT_MIN_SIZE = TezSplitGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE; /** - * Repeated invocations of grouping on the same splits with the same parameters will produce the - * same groups. This may help in cache reuse but may cause hot-spotting on nodes when there are a - * large number of jobs reading the same hot data. True by default. + * @deprecated See equivalent in {@link TezSplitGrouper} */ - public static final String TEZ_GROUPING_REPEATABLE = "tez.grouping.repeatable"; - public static final boolean TEZ_GROUPING_REPEATABLE_DEFAULT = true; + @Deprecated + public static final long TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT = TezSplitGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT; - static class SplitHolder { - InputSplit split; - boolean isProcessed = false; - SplitHolder(InputSplit split) { - this.split = split; - } - } - - static class LocationHolder { - List<SplitHolder> splits; - int headIndex = 0; - LocationHolder(int capacity) { - splits = new ArrayList<SplitHolder>(capacity); - } - boolean isEmpty() { - return (headIndex == splits.size()); - } - SplitHolder getUnprocessedHeadSplit() { - while (!isEmpty()) { - SplitHolder holder = splits.get(headIndex); - if (!holder.isProcessed) { - return holder; - } - incrementHeadIndex(); - } - return null; - } - void incrementHeadIndex() { - headIndex++; - } - } + /** + * @deprecated See equivalent in {@link TezSplitGrouper} + */ + @Deprecated + public static final String TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION = + TezSplitGrouper.TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION; + /** + * @deprecated See equivalent in {@link TezSplitGrouper} + */ + @Deprecated + public static final float TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT = TezSplitGrouper.TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT; - private static final SplitSizeEstimator DEFAULT_SPLIT_ESTIMATOR = new DefaultSplitSizeEstimator(); + /** + * @deprecated See equivalent in {@link TezSplitGrouper} + */ + @Deprecated + public static final String TEZ_GROUPING_REPEATABLE = TezSplitGrouper.TEZ_GROUPING_REPEATABLE; + /** + * @deprecated See equivalent in {@link TezSplitGrouper} + */ + @Deprecated + public static final boolean TEZ_GROUPING_REPEATABLE_DEFAULT = TezSplitGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT; - static final class DefaultSplitSizeEstimator implements SplitSizeEstimator { - @Override - public long getEstimatedSize(InputSplit split) throws InterruptedException, - IOException { - return split.getLength(); - } - } - Map<String, LocationHolder> createLocationsMap(Configuration conf) { - if (conf.getBoolean(TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE, - TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT)) { - return new TreeMap<String, LocationHolder>(); - } - return new HashMap<String, LocationHolder>(); - } - public List<InputSplit> getGroupedSplits(Configuration conf, List<InputSplit> originalSplits, int desiredNumSplits, String wrappedInputFormatName) throws IOException, InterruptedException { @@ -166,370 +141,48 @@ public class TezMapReduceSplitsGrouper { } public List<InputSplit> getGroupedSplits(Configuration conf, - List<InputSplit> originalSplits, int desiredNumSplits, - String wrappedInputFormatName, SplitSizeEstimator estimator) throws IOException, InterruptedException { - LOG.info("Grouping splits in Tez"); + List<InputSplit> originalSplits, int desiredNumSplits, + String wrappedInputFormatName, + SplitSizeEstimator estimator) throws IOException, + InterruptedException { Preconditions.checkArgument(originalSplits != null, "Splits must be specified"); + List<SplitContainer> originalSplitContainers = Lists.transform(originalSplits, + new Function<InputSplit, SplitContainer>() { + @Override + public SplitContainer apply(InputSplit input) { + return new MapReduceSplitContainer(input); + } + }); - int configNumSplits = conf.getInt(TEZ_GROUPING_SPLIT_COUNT, 0); - if (configNumSplits > 0) { - // always use config override if specified - desiredNumSplits = configNumSplits; - LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits); - } - - if (estimator == null) { - estimator = DEFAULT_SPLIT_ESTIMATOR; - } - if (! (configNumSplits > 0 || - originalSplits.size() == 0)) { - // numSplits has not been overridden by config - // numSplits has been set at runtime - // there are splits generated - // desired splits is less than number of splits generated - // Do sanity checks - long totalLength = 0; - for (InputSplit split : originalSplits) { - totalLength += estimator.getEstimatedSize(split); - } - - int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.size(); - long lengthPerGroup = totalLength/splitCount; + return Lists.transform(super + .getGroupedSplits(conf, originalSplitContainers, desiredNumSplits, + wrappedInputFormatName, estimator == null ? null : + new SplitSizeEstimatorWrapperMapReduce(estimator)), + new Function<GroupedSplitContainer, InputSplit>() { + @Override + public InputSplit apply(GroupedSplitContainer input) { - long maxLengthPerGroup = conf.getLong( - TEZ_GROUPING_SPLIT_MAX_SIZE, - TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT); - long minLengthPerGroup = conf.getLong( - TEZ_GROUPING_SPLIT_MIN_SIZE, - TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT); - if (maxLengthPerGroup < minLengthPerGroup || - minLengthPerGroup <=0) { - throw new TezUncheckedException( - "Invalid max/min group lengths. Required min>0, max>=min. " + - " max: " + maxLengthPerGroup + " min: " + minLengthPerGroup); - } - if (lengthPerGroup > maxLengthPerGroup) { - // splits too big to work. Need to override with max size. - int newDesiredNumSplits = (int)(totalLength/maxLengthPerGroup) + 1; - LOG.info("Desired splits: " + desiredNumSplits + " too small. " + - " Desired splitLength: " + lengthPerGroup + - " Max splitLength: " + maxLengthPerGroup + - " New desired splits: " + newDesiredNumSplits + - " Total length: " + totalLength + - " Original splits: " + originalSplits.size()); - - desiredNumSplits = newDesiredNumSplits; - } else if (lengthPerGroup < minLengthPerGroup) { - // splits too small to work. Need to override with size. - int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1; - LOG.info("Desired splits: " + desiredNumSplits + " too large. " + - " Desired splitLength: " + lengthPerGroup + - " Min splitLength: " + minLengthPerGroup + - " New desired splits: " + newDesiredNumSplits + - " Total length: " + totalLength + - " Original splits: " + originalSplits.size()); - - desiredNumSplits = newDesiredNumSplits; - } - } - - List<InputSplit> groupedSplits = null; - - if (desiredNumSplits == 0 || - originalSplits.size() == 0 || - desiredNumSplits >= originalSplits.size()) { - // nothing set. so return all the splits as is - LOG.info("Using original number of splits: " + originalSplits.size() + - " desired splits: " + desiredNumSplits); - groupedSplits = new ArrayList<InputSplit>(originalSplits.size()); - for (InputSplit split : originalSplits) { - TezGroupedSplit newSplit = - new TezGroupedSplit(1, wrappedInputFormatName, split.getLocations()); - newSplit.addSplit(split); - groupedSplits.add(newSplit); - } - return groupedSplits; - } - - String emptyLocation = "EmptyLocation"; - String[] emptyLocations = {emptyLocation}; - groupedSplits = new ArrayList<InputSplit>(desiredNumSplits); - - long totalLength = 0; - Map<String, LocationHolder> distinctLocations = createLocationsMap(conf); - // go through splits and add them to locations - for (InputSplit split : originalSplits) { - totalLength += estimator.getEstimatedSize(split); - String[] locations = split.getLocations(); - if (locations == null || locations.length == 0) { - locations = emptyLocations; - } - for (String location : locations ) { - if (location == null) { - location = emptyLocation; - } - distinctLocations.put(location, null); - } - } - - long lengthPerGroup = totalLength/desiredNumSplits; - int numNodeLocations = distinctLocations.size(); - int numSplitsPerLocation = originalSplits.size()/numNodeLocations; - int numSplitsInGroup = originalSplits.size()/desiredNumSplits; - - // allocation loop here so that we have a good initial size for the lists - for (String location : distinctLocations.keySet()) { - distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1)); - } - - Set<String> locSet = new HashSet<String>(); - for (InputSplit split : originalSplits) { - locSet.clear(); - SplitHolder splitHolder = new SplitHolder(split); - String[] locations = split.getLocations(); - if (locations == null || locations.length == 0) { - locations = emptyLocations; - } - for (String location : locations) { - if (location == null) { - location = emptyLocation; - } - locSet.add(location); - } - for (String location : locSet) { - LocationHolder holder = distinctLocations.get(location); - holder.splits.add(splitHolder); - } - } - - boolean groupByLength = conf.getBoolean( - TEZ_GROUPING_SPLIT_BY_LENGTH, - TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT); - boolean groupByCount = conf.getBoolean( - TEZ_GROUPING_SPLIT_BY_COUNT, - TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT); - if (!(groupByLength || groupByCount)) { - throw new TezUncheckedException( - "None of the grouping parameters are true: " - + TEZ_GROUPING_SPLIT_BY_LENGTH + ", " - + TEZ_GROUPING_SPLIT_BY_COUNT); - } - LOG.info("Desired numSplits: " + desiredNumSplits + - " lengthPerGroup: " + lengthPerGroup + - " numLocations: " + numNodeLocations + - " numSplitsPerLocation: " + numSplitsPerLocation + - " numSplitsInGroup: " + numSplitsInGroup + - " totalLength: " + totalLength + - " numOriginalSplits: " + originalSplits.size() + - " . Grouping by length: " + groupByLength + " count: " + groupByCount); - - // go through locations and group splits - int splitsProcessed = 0; - List<SplitHolder> group = new ArrayList<SplitHolder>(numSplitsInGroup); - Set<String> groupLocationSet = new HashSet<String>(10); - boolean allowSmallGroups = false; - boolean doingRackLocal = false; - int iterations = 0; - while (splitsProcessed < originalSplits.size()) { - iterations++; - int numFullGroupsCreated = 0; - for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) { - group.clear(); - groupLocationSet.clear(); - String location = entry.getKey(); - LocationHolder holder = entry.getValue(); - SplitHolder splitHolder = holder.getUnprocessedHeadSplit(); - if (splitHolder == null) { - // all splits on node processed - continue; - } - int oldHeadIndex = holder.headIndex; - long groupLength = 0; - int groupNumSplits = 0; - do { - group.add(splitHolder); - groupLength += estimator.getEstimatedSize(splitHolder.split); - groupNumSplits++; - holder.incrementHeadIndex(); - splitHolder = holder.getUnprocessedHeadSplit(); - } while(splitHolder != null - && (!groupByLength || - (groupLength + estimator.getEstimatedSize(splitHolder.split) <= lengthPerGroup)) - && (!groupByCount || - (groupNumSplits + 1 <= numSplitsInGroup))); + List<InputSplit> underlyingSplits = Lists.transform(input.getWrappedSplitContainers(), + new Function<SplitContainer, InputSplit>() { + @Override + public InputSplit apply(SplitContainer input) { + return ((MapReduceSplitContainer) input).getRawSplit(); + } + }); - if (holder.isEmpty() - && !allowSmallGroups - && (!groupByLength || groupLength < lengthPerGroup/2) - && (!groupByCount || groupNumSplits < numSplitsInGroup/2)) { - // group too small, reset it - holder.headIndex = oldHeadIndex; - continue; - } - - numFullGroupsCreated++; + return new TezGroupedSplit(underlyingSplits, input.getWrappedInputFormatName(), + input.getLocations(), input.getRack(), input.getLength()); - // One split group created - String[] groupLocation = {location}; - if (location == emptyLocation) { - groupLocation = null; - } else if (doingRackLocal) { - for (SplitHolder splitH : group) { - String[] locations = splitH.split.getLocations(); - if (locations != null) { - for (String loc : locations) { - if (loc != null) { - groupLocationSet.add(loc); - } - } - } - } - groupLocation = groupLocationSet.toArray(groupLocation); - } - TezGroupedSplit groupedSplit = - new TezGroupedSplit(group.size(), wrappedInputFormatName, - groupLocation, - // pass rack local hint directly to AM - ((doingRackLocal && location != emptyLocation)?location:null)); - for (SplitHolder groupedSplitHolder : group) { - groupedSplit.addSplit(groupedSplitHolder.split); - Preconditions.checkState(groupedSplitHolder.isProcessed == false, - "Duplicates in grouping at location: " + location); - groupedSplitHolder.isProcessed = true; - splitsProcessed++; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Grouped " + group.size() - + " length: " + groupedSplit.getLength() - + " split at: " + location); - } - groupedSplits.add(groupedSplit); - } - - if (!doingRackLocal && numFullGroupsCreated < 1) { - // no node could create a node-local group. go rack-local - doingRackLocal = true; - // re-create locations - int numRemainingSplits = originalSplits.size() - splitsProcessed; - Set<InputSplit> remainingSplits = new HashSet<InputSplit>(numRemainingSplits); - // gather remaining splits. - for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) { - LocationHolder locHolder = entry.getValue(); - while (!locHolder.isEmpty()) { - SplitHolder splitHolder = locHolder.getUnprocessedHeadSplit(); - if (splitHolder != null) { - remainingSplits.add(splitHolder.split); - locHolder.incrementHeadIndex(); - } } - } - if (remainingSplits.size() != numRemainingSplits) { - throw new TezUncheckedException("Expected: " + numRemainingSplits - + " got: " + remainingSplits.size()); - } - - // doing all this now instead of up front because the number of remaining - // splits is expected to be much smaller - RackResolver.init(conf); - Map<String, String> locToRackMap = new HashMap<String, String>(distinctLocations.size()); - Map<String, LocationHolder> rackLocations = createLocationsMap(conf); - for (String location : distinctLocations.keySet()) { - String rack = emptyLocation; - if (location != emptyLocation) { - rack = RackResolver.resolve(location).getNetworkLocation(); - } - locToRackMap.put(location, rack); - if (rackLocations.get(rack) == null) { - // splits will probably be located in all racks - rackLocations.put(rack, new LocationHolder(numRemainingSplits)); - } - } - distinctLocations.clear(); - HashSet<String> rackSet = new HashSet<String>(rackLocations.size()); - int numRackSplitsToGroup = remainingSplits.size(); - for (InputSplit split : originalSplits) { - if (numRackSplitsToGroup == 0) { - break; - } - // Iterate through the original splits in their order and consider them for grouping. - // This maintains the original ordering in the list and thus subsequent grouping will - // maintain that order - if (!remainingSplits.contains(split)) { - continue; - } - numRackSplitsToGroup--; - rackSet.clear(); - SplitHolder splitHolder = new SplitHolder(split); - String[] locations = split.getLocations(); - if (locations == null || locations.length == 0) { - locations = emptyLocations; - } - for (String location : locations ) { - if (location == null) { - location = emptyLocation; - } - rackSet.add(locToRackMap.get(location)); - } - for (String rack : rackSet) { - rackLocations.get(rack).splits.add(splitHolder); - } - } - - remainingSplits.clear(); - distinctLocations = rackLocations; - // adjust split length to be smaller because the data is non local - float rackSplitReduction = conf.getFloat( - TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION, - TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT); - if (rackSplitReduction > 0) { - long newLengthPerGroup = (long)(lengthPerGroup*rackSplitReduction); - int newNumSplitsInGroup = (int) (numSplitsInGroup*rackSplitReduction); - if (newLengthPerGroup > 0) { - lengthPerGroup = newLengthPerGroup; - } - if (newNumSplitsInGroup > 0) { - numSplitsInGroup = newNumSplitsInGroup; - } - } - - LOG.info("Doing rack local after iteration: " + iterations + - " splitsProcessed: " + splitsProcessed + - " numFullGroupsInRound: " + numFullGroupsCreated + - " totalGroups: " + groupedSplits.size() + - " lengthPerGroup: " + lengthPerGroup + - " numSplitsInGroup: " + numSplitsInGroup); - - // dont do smallGroups for the first pass - continue; - } - - if (!allowSmallGroups && numFullGroupsCreated <= numNodeLocations/10) { - // a few nodes have a lot of data or data is thinly spread across nodes - // so allow small groups now - allowSmallGroups = true; - LOG.info("Allowing small groups after iteration: " + iterations + - " splitsProcessed: " + splitsProcessed + - " numFullGroupsInRound: " + numFullGroupsCreated + - " totalGroups: " + groupedSplits.size()); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Iteration: " + iterations + - " splitsProcessed: " + splitsProcessed + - " numFullGroupsInRound: " + numFullGroupsCreated + - " totalGroups: " + groupedSplits.size()); - } - } - LOG.info("Number of splits desired: " + desiredNumSplits + - " created: " + groupedSplits.size() + - " splitsProcessed: " + splitsProcessed); - return groupedSplits; + }); } - + /** * Builder that can be used to configure grouping in Tez - * + * + * @deprecated See {@link org.apache.tez.mapreduce.grouper.TezSplitGrouper.TezMRSplitsGrouperConfigBuilder#newConfigBuilder(Configuration)} + * * @param conf * {@link Configuration} This will be modified in place. If * configuration values may be changed at runtime via a config file @@ -538,10 +191,15 @@ public class TezMapReduceSplitsGrouper { * be derived from the Configuration object. * @return {@link org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper.TezMRSplitsGrouperConfigBuilder} */ + @Deprecated public static TezMRSplitsGrouperConfigBuilder createConfigBuilder(Configuration conf) { return new TezMRSplitsGrouperConfigBuilder(conf); - } + } + /** + * @deprecated See {@link org.apache.tez.mapreduce.grouper.TezSplitGrouper.TezMRSplitsGrouperConfigBuilder} + */ + @Deprecated public static final class TezMRSplitsGrouperConfigBuilder { private final Configuration conf; @@ -556,27 +214,27 @@ public class TezMapReduceSplitsGrouper { } public TezMRSplitsGrouperConfigBuilder setGroupSplitCount(int count) { - this.conf.setInt(TEZ_GROUPING_SPLIT_COUNT, count); + this.conf.setInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_COUNT, count); return this; } public TezMRSplitsGrouperConfigBuilder setGroupSplitByCount(boolean enabled) { - this.conf.setBoolean(TEZ_GROUPING_SPLIT_BY_COUNT, enabled); + this.conf.setBoolean(TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_COUNT, enabled); return this; } public TezMRSplitsGrouperConfigBuilder setGroupSplitByLength(boolean enabled) { - this.conf.setBoolean(TEZ_GROUPING_SPLIT_BY_LENGTH, enabled); + this.conf.setBoolean(TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH, enabled); return this; } public TezMRSplitsGrouperConfigBuilder setGroupSplitWaves(float multiplier) { - this.conf.setFloat(TEZ_GROUPING_SPLIT_WAVES, multiplier); + this.conf.setFloat(TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES, multiplier); return this; } public TezMRSplitsGrouperConfigBuilder setGroupingRackSplitSizeReduction(float rackSplitSizeReduction) { - this.conf.setFloat(TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION, rackSplitSizeReduction); + this.conf.setFloat(TezSplitGrouper.TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION, rackSplitSizeReduction); return this; } @@ -584,8 +242,8 @@ public class TezMapReduceSplitsGrouper { * upper and lower bounds for the splits */ public TezMRSplitsGrouperConfigBuilder setGroupingSplitSize(long lowerBound, long upperBound) { - this.conf.setLong(TEZ_GROUPING_SPLIT_MIN_SIZE, lowerBound); - this.conf.setLong(TEZ_GROUPING_SPLIT_MAX_SIZE, upperBound); + this.conf.setLong(TezSplitGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE, lowerBound); + this.conf.setLong(TezSplitGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE, upperBound); return this; } @@ -593,5 +251,4 @@ public class TezMapReduceSplitsGrouper { return this.conf; } } - } http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java index b93e4ba..ac64bf7 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java @@ -23,6 +23,7 @@ import java.util.List; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import org.apache.tez.mapreduce.grouper.TezSplitGrouper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -97,8 +98,8 @@ public class MRInputAMSplitGenerator extends InputInitializer { int totalResource = getContext().getTotalAvailableResource().getMemory(); int taskResource = getContext().getVertexTaskResource().getMemory(); float waves = conf.getFloat( - TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES, - TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); + TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES, + TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); int numTasks = (int)((totalResource*waves)/taskResource); http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/GroupedSplitContainer.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/GroupedSplitContainer.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/GroupedSplitContainer.java new file mode 100644 index 0000000..c236257 --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/GroupedSplitContainer.java @@ -0,0 +1,74 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.grouper; + + +import java.util.List; + +import com.google.common.collect.Lists; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.tez.dag.api.TezUncheckedException; + + +/** + * An entity to hold grouped splits - either mapred or mapreduce. + */ [email protected] +public class GroupedSplitContainer { + + private final List<SplitContainer> wrappedSplits; + private final String wrappedInputFormatName; + private final String[] locations; + private final String rack; + long length = 0; + + public GroupedSplitContainer(int numSplits, String wrappedInputFormatName, + String[] locations, String rack) { + this.wrappedSplits = Lists.newArrayListWithCapacity(numSplits); + this.wrappedInputFormatName = wrappedInputFormatName; + this.locations = locations; + this.rack = rack; + } + + + public void addSplit(SplitContainer splitContainer) { + wrappedSplits.add(splitContainer); + try { + length += splitContainer.getLength(); + } catch (Exception e) { + throw new TezUncheckedException(e); + } + } + + public long getLength() { + return length; + } + + public String getWrappedInputFormatName() { + return this.wrappedInputFormatName; + } + + public List<SplitContainer> getWrappedSplitContainers() { + return this.wrappedSplits; + } + + public String[] getLocations() { + return this.locations; + } + + public String getRack() { + return this.rack; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapReduceSplitContainer.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapReduceSplitContainer.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapReduceSplitContainer.java new file mode 100644 index 0000000..63e2138 --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapReduceSplitContainer.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.grouper; + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.mapreduce.InputSplit; + +public class MapReduceSplitContainer extends SplitContainer { + + private final InputSplit inputSplit; + + public MapReduceSplitContainer(InputSplit inputSplit) { + Preconditions.checkNotNull(inputSplit); + this.inputSplit = inputSplit; + } + + @Override + public String[] getPreferredLocations() throws IOException, InterruptedException { + return inputSplit.getLocations(); + } + + @Override + public long getLength() throws IOException, InterruptedException { + return inputSplit.getLength(); + } + + public InputSplit getRawSplit() { + return this.inputSplit; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MapReduceSplitContainer that = (MapReduceSplitContainer) o; + + return !(inputSplit != null ? !inputSplit.equals(that.inputSplit) : that.inputSplit != null); + + } + + @Override + public int hashCode() { + return inputSplit != null ? inputSplit.hashCode() : 0; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapredSplitContainer.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapredSplitContainer.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapredSplitContainer.java new file mode 100644 index 0000000..f7dbfda --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapredSplitContainer.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.grouper; + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.mapred.InputSplit; + +public class MapredSplitContainer extends SplitContainer { + + private final InputSplit inputSplit; + + public MapredSplitContainer(InputSplit inputSplit) { + Preconditions.checkNotNull(inputSplit); + this.inputSplit = inputSplit; + } + + @Override + public String[] getPreferredLocations() throws IOException { + return inputSplit.getLocations(); + } + + @Override + public long getLength() throws IOException { + return inputSplit.getLength(); + } + + public InputSplit getRawSplit() { + return this.inputSplit; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MapredSplitContainer that = (MapredSplitContainer) o; + + return !(inputSplit != null ? !inputSplit.equals(that.inputSplit) : that.inputSplit != null); + + } + + @Override + public int hashCode() { + return inputSplit != null ? inputSplit.hashCode() : 0; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitContainer.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitContainer.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitContainer.java new file mode 100644 index 0000000..383b9ca --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitContainer.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.grouper; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; + [email protected] +/** + * Interface to represent both mapred and mapreduce splits + */ +public abstract class SplitContainer { + + private boolean isProcessed = false; + + + public abstract String[] getPreferredLocations() throws IOException, InterruptedException; + + public abstract long getLength() throws IOException, InterruptedException; + + public boolean isProcessed() { + return isProcessed; + } + + public void setIsProcessed(boolean val) { + this.isProcessed = val; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapper.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapper.java new file mode 100644 index 0000000..ebb33ad --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapper.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.grouper; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; + + +/** + * An interface to handle split size estimation across mapred and mapreduce splits + */ [email protected] +public interface SplitSizeEstimatorWrapper { + + long getEstimatedSize(SplitContainer splitContainer) throws IOException, InterruptedException; + +} http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapReduce.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapReduce.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapReduce.java new file mode 100644 index 0000000..df6e3c9 --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapReduce.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.grouper; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.split.SplitSizeEstimator; + +public class SplitSizeEstimatorWrapperMapReduce implements SplitSizeEstimatorWrapper { + + private final SplitSizeEstimator estimator; + + public SplitSizeEstimatorWrapperMapReduce(SplitSizeEstimator estimator) { + this.estimator = estimator; + } + + @Override + public long getEstimatedSize(SplitContainer rawContainer) throws IOException, + InterruptedException { + MapReduceSplitContainer splitContainer = (MapReduceSplitContainer)rawContainer; + return estimator.getEstimatedSize(splitContainer.getRawSplit()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapred.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapred.java new file mode 100644 index 0000000..6dd3a56 --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapred.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.grouper; + +import java.io.IOException; + +import org.apache.hadoop.mapred.split.SplitSizeEstimator; + +public class SplitSizeEstimatorWrapperMapred implements SplitSizeEstimatorWrapper { + + private final SplitSizeEstimator estimator; + + public SplitSizeEstimatorWrapperMapred(SplitSizeEstimator estimator) { + this.estimator = estimator; + } + + @Override + public long getEstimatedSize(SplitContainer rawContainer) throws IOException, + InterruptedException { + MapredSplitContainer splitContainer = (MapredSplitContainer)rawContainer; + return estimator.getEstimatedSize(splitContainer.getRawSplit()); + } +}
