Repository: tez
Updated Branches:
  refs/heads/master c89e352e0 -> 6632903bb


http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
 
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
new file mode 100644
index 0000000..eb616a0
--- /dev/null
+++ 
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
@@ -0,0 +1,571 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class TezSplitGrouper {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TezSplitGrouper.class);
+
+  /**
+   * Specify the number of splits desired to be created
+   */
+  public static final String TEZ_GROUPING_SPLIT_COUNT = 
"tez.grouping.split-count";
+  /**
+   * Limit the number of splits in a group by the total length of the splits 
in the group
+   */
+  public static final String TEZ_GROUPING_SPLIT_BY_LENGTH = 
"tez.grouping.by-length";
+  public static final boolean TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT = true;
+  /**
+   * Limit the number of splits in a group by the number of splits in the group
+   */
+  public static final String TEZ_GROUPING_SPLIT_BY_COUNT = 
"tez.grouping.by-count";
+  public static final boolean TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT = false;
+
+  /**
+   * The multiplier for available queue capacity when determining number of
+   * tasks for a Vertex. 1.7 with 100% queue available implies generating a
+   * number of tasks roughly equal to 170% of the available containers on the
+   * queue. This enables multiple waves of mappers where the final wave is 
slightly smaller
+   * than the remaining waves. The gap helps overlap the final wave with any 
slower tasks
+   * from previous waves and tries to hide the delays from the slower tasks. 
Good values for
+   * this are 1.7, 2.7, 3.7 etc. Increase the number of waves to make the 
tasks smaller or
+   * shorter.
+   */
+  public static final String TEZ_GROUPING_SPLIT_WAVES = 
"tez.grouping.split-waves";
+  public static final float TEZ_GROUPING_SPLIT_WAVES_DEFAULT = 1.7f;
+
+  /**
+   * Upper bound on the size (in bytes) of a grouped split, to avoid 
generating excessively large splits.
+   */
+  public static final String TEZ_GROUPING_SPLIT_MAX_SIZE = 
"tez.grouping.max-size";
+  public static final long TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT = 
1024*1024*1024L;
+
+  /**
+   * Lower bound on the size (in bytes) of a grouped split, to avoid 
generating too many small splits.
+   */
+  public static final String TEZ_GROUPING_SPLIT_MIN_SIZE = 
"tez.grouping.min-size";
+  public static final long TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT = 50*1024*1024L;
+
+  /**
+   * This factor is used to decrease the per group desired (length and count) 
limits for groups
+   * created by combining splits within a rack. Since reading this split 
involves reading data intra
+   * rack, the group is made smaller to cover up for the increased latencies 
in doing intra rack
+   * reads. The value should be a fraction <= 1.
+   */
+  public static final String TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION =
+      "tez.grouping.rack-split-reduction";
+  public static final float TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT = 
0.75f;
+
+  /**
+   * Repeated invocations of grouping on the same splits with the same 
parameters will produce the
+   * same groups. This may help in cache reuse but may cause hot-spotting on 
nodes when there are a
+   * large number of jobs reading the same hot data. True by default.
+   */
+  public static final String TEZ_GROUPING_REPEATABLE = 
"tez.grouping.repeatable";
+  public static final boolean TEZ_GROUPING_REPEATABLE_DEFAULT = true;
+
+
+  static class LocationHolder {
+    List<SplitContainer> splits;
+    int headIndex = 0;
+    LocationHolder(int capacity) {
+      splits = new ArrayList<SplitContainer>(capacity);
+    }
+    boolean isEmpty() {
+      return (headIndex == splits.size());
+    }
+    SplitContainer getUnprocessedHeadSplit() {
+      while (!isEmpty()) {
+        SplitContainer holder = splits.get(headIndex);
+        if (!holder.isProcessed()) {
+          return holder;
+        }
+        incrementHeadIndex();
+      }
+      return null;
+    }
+    void incrementHeadIndex() {
+      headIndex++;
+    }
+  }
+
+  private static final SplitSizeEstimatorWrapper DEFAULT_SPLIT_ESTIMATOR = new 
DefaultSplitSizeEstimatorWrapper();
+
+  static final class DefaultSplitSizeEstimatorWrapper implements 
SplitSizeEstimatorWrapper {
+
+    @Override
+    public long getEstimatedSize(SplitContainer splitContainer) throws 
IOException,
+        InterruptedException {
+      return splitContainer.getLength();
+    }
+  }
+
+  Map<String, LocationHolder> createLocationsMap(Configuration conf) {
+    if (conf.getBoolean(TEZ_GROUPING_REPEATABLE,
+        TEZ_GROUPING_REPEATABLE_DEFAULT)) {
+      return new TreeMap<String, LocationHolder>();
+    }
+    return new HashMap<String, LocationHolder>();
+  }
+
+  public List<GroupedSplitContainer> getGroupedSplits(Configuration conf,
+                                                      List<SplitContainer> 
originalSplits,
+                                                      int desiredNumSplits,
+                                                      String 
wrappedInputFormatName,
+                                                      
SplitSizeEstimatorWrapper estimator) throws
+      IOException, InterruptedException {
+    LOG.info("Grouping splits in Tez");
+    Preconditions.checkArgument(originalSplits != null, "Splits must be 
specified");
+
+    int configNumSplits = conf.getInt(TEZ_GROUPING_SPLIT_COUNT, 0);
+    if (configNumSplits > 0) {
+      // always use config override if specified
+      desiredNumSplits = configNumSplits;
+      LOG.info("Desired numSplits overridden by config to: " + 
desiredNumSplits);
+    }
+
+    if (estimator == null) {
+      estimator = DEFAULT_SPLIT_ESTIMATOR;
+    }
+
+    if (! (configNumSplits > 0 ||
+        originalSplits.size() == 0)) {
+      // numSplits has not been overridden by config
+      // numSplits has been set at runtime
+      // there are splits generated
+      // desired splits is less than number of splits generated
+      // Do sanity checks
+      long totalLength = 0;
+      for (SplitContainer split : originalSplits) {
+        totalLength += estimator.getEstimatedSize(split);
+      }
+
+      int splitCount = 
desiredNumSplits>0?desiredNumSplits:originalSplits.size();
+      long lengthPerGroup = totalLength/splitCount;
+
+      long maxLengthPerGroup = conf.getLong(
+          TEZ_GROUPING_SPLIT_MAX_SIZE,
+          TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT);
+      long minLengthPerGroup = conf.getLong(
+          TEZ_GROUPING_SPLIT_MIN_SIZE,
+          TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT);
+      if (maxLengthPerGroup < minLengthPerGroup ||
+          minLengthPerGroup <=0) {
+        throw new TezUncheckedException(
+            "Invalid max/min group lengths. Required min>0, max>=min. " +
+                " max: " + maxLengthPerGroup + " min: " + minLengthPerGroup);
+      }
+      if (lengthPerGroup > maxLengthPerGroup) {
+        // splits too big to work. Need to override with max size.
+        int newDesiredNumSplits = (int)(totalLength/maxLengthPerGroup) + 1;
+        LOG.info("Desired splits: " + desiredNumSplits + " too small. " +
+            " Desired splitLength: " + lengthPerGroup +
+            " Max splitLength: " + maxLengthPerGroup +
+            " New desired splits: " + newDesiredNumSplits +
+            " Total length: " + totalLength +
+            " Original splits: " + originalSplits.size());
+
+        desiredNumSplits = newDesiredNumSplits;
+      } else if (lengthPerGroup < minLengthPerGroup) {
+        // splits too small to work. Need to override with size.
+        int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1;
+        LOG.info("Desired splits: " + desiredNumSplits + " too large. " +
+            " Desired splitLength: " + lengthPerGroup +
+            " Min splitLength: " + minLengthPerGroup +
+            " New desired splits: " + newDesiredNumSplits +
+            " Total length: " + totalLength +
+            " Original splits: " + originalSplits.size());
+
+        desiredNumSplits = newDesiredNumSplits;
+      }
+    }
+
+    List<GroupedSplitContainer> groupedSplits = null;
+
+    if (desiredNumSplits == 0 ||
+        originalSplits.size() == 0 ||
+        desiredNumSplits >= originalSplits.size()) {
+      // nothing set. so return all the splits as is
+      LOG.info("Using original number of splits: " + originalSplits.size() +
+          " desired splits: " + desiredNumSplits);
+      groupedSplits = new 
ArrayList<GroupedSplitContainer>(originalSplits.size());
+      for (SplitContainer split : originalSplits) {
+        GroupedSplitContainer newSplit =
+            new GroupedSplitContainer(1, wrappedInputFormatName, 
split.getPreferredLocations(),
+                null);
+        newSplit.addSplit(split);
+        groupedSplits.add(newSplit);
+      }
+      return groupedSplits;
+    }
+
+    String emptyLocation = "EmptyLocation";
+    String[] emptyLocations = {emptyLocation};
+    groupedSplits = new ArrayList<GroupedSplitContainer>(desiredNumSplits);
+
+    long totalLength = 0;
+    Map<String, LocationHolder> distinctLocations = createLocationsMap(conf);
+    // go through splits and add them to locations
+    for (SplitContainer split : originalSplits) {
+      totalLength += estimator.getEstimatedSize(split);
+      String[] locations = split.getPreferredLocations();
+      if (locations == null || locations.length == 0) {
+        locations = emptyLocations;
+      }
+      for (String location : locations ) {
+        if (location == null) {
+          location = emptyLocation;
+        }
+        distinctLocations.put(location, null);
+      }
+    }
+
+    long lengthPerGroup = totalLength/desiredNumSplits;
+    int numNodeLocations = distinctLocations.size();
+    int numSplitsPerLocation = originalSplits.size()/numNodeLocations;
+    int numSplitsInGroup = originalSplits.size()/desiredNumSplits;
+
+    // allocation loop here so that we have a good initial size for the lists
+    for (String location : distinctLocations.keySet()) {
+      distinctLocations.put(location, new 
LocationHolder(numSplitsPerLocation+1));
+    }
+
+    Set<String> locSet = new HashSet<String>();
+    for (SplitContainer split : originalSplits) {
+      locSet.clear();
+      String[] locations = split.getPreferredLocations();
+      if (locations == null || locations.length == 0) {
+        locations = emptyLocations;
+      }
+      for (String location : locations) {
+        if (location == null) {
+          location = emptyLocation;
+        }
+        locSet.add(location);
+      }
+      for (String location : locSet) {
+        LocationHolder holder = distinctLocations.get(location);
+        holder.splits.add(split);
+      }
+    }
+
+    boolean groupByLength = conf.getBoolean(
+        TEZ_GROUPING_SPLIT_BY_LENGTH,
+        TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT);
+    boolean groupByCount = conf.getBoolean(
+        TEZ_GROUPING_SPLIT_BY_COUNT,
+        TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT);
+    if (!(groupByLength || groupByCount)) {
+      throw new TezUncheckedException(
+          "None of the grouping parameters are true: "
+              + TEZ_GROUPING_SPLIT_BY_LENGTH + ", "
+              + TEZ_GROUPING_SPLIT_BY_COUNT);
+    }
+    LOG.info("Desired numSplits: " + desiredNumSplits +
+        " lengthPerGroup: " + lengthPerGroup +
+        " numLocations: " + numNodeLocations +
+        " numSplitsPerLocation: " + numSplitsPerLocation +
+        " numSplitsInGroup: " + numSplitsInGroup +
+        " totalLength: " + totalLength +
+        " numOriginalSplits: " + originalSplits.size() +
+        " . Grouping by length: " + groupByLength + " count: " + groupByCount);
+
+    // go through locations and group splits
+    int splitsProcessed = 0;
+    List<SplitContainer> group = new 
ArrayList<SplitContainer>(numSplitsInGroup);
+    Set<String> groupLocationSet = new HashSet<String>(10);
+    boolean allowSmallGroups = false;
+    boolean doingRackLocal = false;
+    int iterations = 0;
+    while (splitsProcessed < originalSplits.size()) {
+      iterations++;
+      int numFullGroupsCreated = 0;
+      for (Map.Entry<String, LocationHolder> entry : 
distinctLocations.entrySet()) {
+        group.clear();
+        groupLocationSet.clear();
+        String location = entry.getKey();
+        LocationHolder holder = entry.getValue();
+        // KKK rename to splitContainer
+        SplitContainer splitContainer = holder.getUnprocessedHeadSplit();
+        if (splitContainer == null) {
+          // all splits on node processed
+          continue;
+        }
+        int oldHeadIndex = holder.headIndex;
+        long groupLength = 0;
+        int groupNumSplits = 0;
+        do {
+          group.add(splitContainer);
+          groupLength += estimator.getEstimatedSize(splitContainer);
+          groupNumSplits++;
+          holder.incrementHeadIndex();
+          splitContainer = holder.getUnprocessedHeadSplit();
+        } while(splitContainer != null
+            && (!groupByLength ||
+            (groupLength + estimator.getEstimatedSize(splitContainer) <= 
lengthPerGroup))
+            && (!groupByCount ||
+            (groupNumSplits + 1 <= numSplitsInGroup)));
+
+        if (holder.isEmpty()
+            && !allowSmallGroups
+            && (!groupByLength || groupLength < lengthPerGroup/2)
+            && (!groupByCount || groupNumSplits < numSplitsInGroup/2)) {
+          // group too small, reset it
+          holder.headIndex = oldHeadIndex;
+          continue;
+        }
+
+        numFullGroupsCreated++;
+
+        // One split group created
+        String[] groupLocation = {location};
+        if (location == emptyLocation) {
+          groupLocation = null;
+        } else if (doingRackLocal) {
+          for (SplitContainer splitH : group) {
+            String[] locations = splitH.getPreferredLocations();
+            if (locations != null) {
+              for (String loc : locations) {
+                if (loc != null) {
+                  groupLocationSet.add(loc);
+                }
+              }
+            }
+          }
+          groupLocation = groupLocationSet.toArray(groupLocation);
+        }
+        GroupedSplitContainer groupedSplit =
+            new GroupedSplitContainer(group.size(), wrappedInputFormatName,
+                groupLocation,
+                // pass rack local hint directly to AM
+                ((doingRackLocal && location != emptyLocation)?location:null));
+        for (SplitContainer groupedSplitContainer : group) {
+          groupedSplit.addSplit(groupedSplitContainer);
+          Preconditions.checkState(groupedSplitContainer.isProcessed() == 
false,
+              "Duplicates in grouping at location: " + location);
+          groupedSplitContainer.setIsProcessed(true);
+          splitsProcessed++;
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Grouped " + group.size()
+              + " length: " + groupedSplit.getLength()
+              + " split at: " + location);
+        }
+        groupedSplits.add(groupedSplit);
+      }
+
+      if (!doingRackLocal && numFullGroupsCreated < 1) {
+        // no node could create a node-local group. go rack-local
+        doingRackLocal = true;
+        // re-create locations
+        int numRemainingSplits = originalSplits.size() - splitsProcessed;
+        Set<SplitContainer> remainingSplits = new 
HashSet<SplitContainer>(numRemainingSplits);
+        // gather remaining splits.
+        for (Map.Entry<String, LocationHolder> entry : 
distinctLocations.entrySet()) {
+          LocationHolder locHolder = entry.getValue();
+          while (!locHolder.isEmpty()) {
+            SplitContainer splitHolder = locHolder.getUnprocessedHeadSplit();
+            if (splitHolder != null) {
+              remainingSplits.add(splitHolder);
+              locHolder.incrementHeadIndex();
+            }
+          }
+        }
+        if (remainingSplits.size() != numRemainingSplits) {
+          throw new TezUncheckedException("Expected: " + numRemainingSplits
+              + " got: " + remainingSplits.size());
+        }
+
+        // doing all this now instead of up front because the number of 
remaining
+        // splits is expected to be much smaller
+        RackResolver.init(conf);
+        Map<String, String> locToRackMap = new HashMap<String, 
String>(distinctLocations.size());
+        Map<String, LocationHolder> rackLocations = createLocationsMap(conf);
+        for (String location : distinctLocations.keySet()) {
+          String rack = emptyLocation;
+          if (location != emptyLocation) {
+            rack = RackResolver.resolve(location).getNetworkLocation();
+          }
+          locToRackMap.put(location, rack);
+          if (rackLocations.get(rack) == null) {
+            // splits will probably be located in all racks
+            rackLocations.put(rack, new LocationHolder(numRemainingSplits));
+          }
+        }
+        distinctLocations.clear();
+        HashSet<String> rackSet = new HashSet<String>(rackLocations.size());
+        int numRackSplitsToGroup = remainingSplits.size();
+        for (SplitContainer split : originalSplits) {
+          if (numRackSplitsToGroup == 0) {
+            break;
+          }
+          // Iterate through the original splits in their order and consider 
them for grouping.
+          // This maintains the original ordering in the list and thus 
subsequent grouping will
+          // maintain that order
+          if (!remainingSplits.contains(split)) {
+            continue;
+          }
+          numRackSplitsToGroup--;
+          rackSet.clear();
+          String[] locations = split.getPreferredLocations();
+          if (locations == null || locations.length == 0) {
+            locations = emptyLocations;
+          }
+          for (String location : locations ) {
+            if (location == null) {
+              location = emptyLocation;
+            }
+            rackSet.add(locToRackMap.get(location));
+          }
+          for (String rack : rackSet) {
+            rackLocations.get(rack).splits.add(split);
+          }
+        }
+
+        remainingSplits.clear();
+        distinctLocations = rackLocations;
+        // adjust split length to be smaller because the data is non local
+        float rackSplitReduction = conf.getFloat(
+            TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION,
+            TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT);
+        if (rackSplitReduction > 0) {
+          long newLengthPerGroup = (long)(lengthPerGroup*rackSplitReduction);
+          int newNumSplitsInGroup = (int) 
(numSplitsInGroup*rackSplitReduction);
+          if (newLengthPerGroup > 0) {
+            lengthPerGroup = newLengthPerGroup;
+          }
+          if (newNumSplitsInGroup > 0) {
+            numSplitsInGroup = newNumSplitsInGroup;
+          }
+        }
+
+        LOG.info("Doing rack local after iteration: " + iterations +
+            " splitsProcessed: " + splitsProcessed +
+            " numFullGroupsInRound: " + numFullGroupsCreated +
+            " totalGroups: " + groupedSplits.size() +
+            " lengthPerGroup: " + lengthPerGroup +
+            " numSplitsInGroup: " + numSplitsInGroup);
+
+        // dont do smallGroups for the first pass
+        continue;
+      }
+
+      if (!allowSmallGroups && numFullGroupsCreated <= numNodeLocations/10) {
+        // a few nodes have a lot of data or data is thinly spread across nodes
+        // so allow small groups now
+        allowSmallGroups = true;
+        LOG.info("Allowing small groups after iteration: " + iterations +
+            " splitsProcessed: " + splitsProcessed +
+            " numFullGroupsInRound: " + numFullGroupsCreated +
+            " totalGroups: " + groupedSplits.size());
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Iteration: " + iterations +
+            " splitsProcessed: " + splitsProcessed +
+            " numFullGroupsInRound: " + numFullGroupsCreated +
+            " totalGroups: " + groupedSplits.size());
+      }
+    }
+    LOG.info("Number of splits desired: " + desiredNumSplits +
+        " created: " + groupedSplits.size() +
+        " splitsProcessed: " + splitsProcessed);
+    return groupedSplits;
+  }
+
+  /**
+   * Builder that can be used to configure grouping in Tez
+   *
+   * @param conf
+   *          {@link Configuration} This will be modified in place. If
+   *          configuration values may be changed at runtime via a config file
+   *          then pass in a {@link Configuration} that is initialized from a
+   *          config file. The parameters that are not overridden in code will
+   *          be derived from the Configuration object.
+   * @return {@link 
org.apache.tez.mapreduce.grouper.TezSplitGrouper.TezMRSplitsGrouperConfigBuilder}
+   */
+  public static TezMRSplitsGrouperConfigBuilder newConfigBuilder(Configuration 
conf) {
+    return new TezMRSplitsGrouperConfigBuilder(conf);
+  }
+
+  public static final class TezMRSplitsGrouperConfigBuilder {
+    private final Configuration conf;
+
+    /**
+     * This configuration will be modified in place
+     */
+    private TezMRSplitsGrouperConfigBuilder(@Nullable Configuration conf) {
+      if (conf == null) {
+        conf = new Configuration(false);
+      }
+      this.conf = conf;
+    }
+
+    public TezMRSplitsGrouperConfigBuilder setGroupSplitCount(int count) {
+      this.conf.setInt(TEZ_GROUPING_SPLIT_COUNT, count);
+      return this;
+    }
+
+    public TezMRSplitsGrouperConfigBuilder setGroupSplitByCount(boolean 
enabled) {
+      this.conf.setBoolean(TEZ_GROUPING_SPLIT_BY_COUNT, enabled);
+      return this;
+    }
+
+    public TezMRSplitsGrouperConfigBuilder setGroupSplitByLength(boolean 
enabled) {
+      this.conf.setBoolean(TEZ_GROUPING_SPLIT_BY_LENGTH, enabled);
+      return this;
+    }
+
+    public TezMRSplitsGrouperConfigBuilder setGroupSplitWaves(float 
multiplier) {
+      this.conf.setFloat(TEZ_GROUPING_SPLIT_WAVES, multiplier);
+      return this;
+    }
+
+    public TezMRSplitsGrouperConfigBuilder 
setGroupingRackSplitSizeReduction(float rackSplitSizeReduction) {
+      this.conf.setFloat(TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION, 
rackSplitSizeReduction);
+      return this;
+    }
+
+    /**
+     * upper and lower bounds for the splits
+     */
+    public TezMRSplitsGrouperConfigBuilder setGroupingSplitSize(long 
lowerBound, long upperBound) {
+      this.conf.setLong(TEZ_GROUPING_SPLIT_MIN_SIZE, lowerBound);
+      this.conf.setLong(TEZ_GROUPING_SPLIT_MAX_SIZE, upperBound);
+      return this;
+    }
+
+    public Configuration build() {
+      return this.conf;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
 
b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
index 13b69c8..eddcc42 100644
--- 
a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
+++ 
b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 
+import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -47,8 +48,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.split.SplitSizeEstimator;
-import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.MockDNSToSwitchMapping;
 import org.junit.Assert;
@@ -308,7 +307,7 @@ public class TestGroupedSplits {
     format.setConf(job);
     format.setInputFormat(mockWrappedFormat);
     
-    job = (JobConf) TezMapReduceSplitsGrouper.createConfigBuilder(job)
+    job = (JobConf) TezSplitGrouper.newConfigBuilder(job)
         .setGroupingSplitSize(50*1000*1000l, 500*1000*1000l)
         .build();
     InputSplit mockSplit1 = mock(InputSplit.class);
@@ -389,7 +388,7 @@ public class TestGroupedSplits {
     
     TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper();
     JobConf conf = new JobConf(defaultConf);
-    conf = (JobConf) TezMapReduceSplitsGrouper.createConfigBuilder(conf)
+    conf = (JobConf) TezSplitGrouper.newConfigBuilder(conf)
     .setGroupingSplitSize(splitLength*3, splitLength*3)
     .setGroupingRackSplitSizeReduction(1)
     .build();
@@ -446,7 +445,7 @@ public class TestGroupedSplits {
 
     TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper();
     JobConf conf = new JobConf(defaultConf);
-    conf = (JobConf) TezMapReduceSplitsGrouper.createConfigBuilder(conf)
+    conf = (JobConf) TezSplitGrouper.newConfigBuilder(conf)
     .setGroupingSplitSize(splitLength*3, splitLength*3)
     .setGroupingRackSplitSizeReduction(1)
     .build();
@@ -553,7 +552,7 @@ public class TestGroupedSplits {
   public void testGroupedSplitWithEstimator() throws IOException {
     JobConf job = new JobConf(defaultConf);
 
-    job = (JobConf) TezMapReduceSplitsGrouper.createConfigBuilder(job)
+    job = (JobConf) TezSplitGrouper.newConfigBuilder(job)
         .setGroupingSplitSize(12*1000*1000l, 25*1000*1000l)
         .build();
 

Reply via email to