Author: sseth Date: Tue Mar 31 04:47:08 2015 New Revision: 1670253 URL: http://svn.apache.org/r1670253 Log: HIVE-10104. LLAP: Generate consistent splits and locations for the same split across jobs. (Siddharth Seth)
Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1670253&r1=1670252&r2=1670253&view=diff ============================================================================== --- hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Mar 31 04:47:08 2015 @@ -1941,6 +1941,8 @@ public class HiveConf extends Configurat "Whether to send the query plan via local resource or RPC"), HIVE_AM_SPLIT_GENERATION("hive.compute.splits.in.am", true, "Whether to generate the splits locally or in the AM (tez only)"), + HIVE_TEZ_GENERATE_CONSISTENT_SPLITS("hive.tez.input.generate.consistent.splits", true, "Whether to generate consisten split" + + "locations when generating splits in the AM"), HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false, "Enables container prewarm for Tez (Hadoop 2 only)"), HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10, "Controls the number of containers to prewarm for Tez (Hadoop 2 only)"), Modified: hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java?rev=1670253&r1=1670252&r2=1670253&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java Tue Mar 31 04:47:08 2015 @@ -484,7 +484,6 @@ public class LlapTaskSchedulerService ex String host = null; if (requestedHosts != null && requestedHosts.length > 0) { // Pick the first host always. Weak attempt at cache affinity. - Arrays.sort(requestedHosts); host = requestedHosts[0]; if (activeHosts.get(host) != null) { LOG.info("Selected host: " + host + " from requested hosts: " + Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1670253&r1=1670252&r2=1670253&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Tue Mar 31 04:47:08 2015 @@ -474,8 +474,12 @@ public class CustomPartitionVertex exten LOG.info("Setting vertex parallelism since we have seen all inputs."); + boolean generateConsistentSplits = HiveConf.getBoolVar( + conf, HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS); + LOG.info("GenerateConsistenSplitsInHive=" + generateConsistentSplits); context.setVertexParallelism(taskCount, VertexLocationHint.create(grouper - .createTaskLocationHints(finalSplits.toArray(new InputSplit[finalSplits.size()]))), emMap, + .createTaskLocationHints(finalSplits.toArray(new InputSplit[finalSplits.size()]), + generateConsistentSplits)), emMap, rootInputSpecUpdate); finalSplits.clear(); } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1670253&r1=1670252&r2=1670253&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Tue Mar 31 04:47:08 2015 @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; import java.util.List; import com.google.common.base.Preconditions; @@ -26,10 +28,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -105,6 +109,8 @@ public class HiveSplitGenerator extends pruner.prune(); InputSplitInfoMem inputSplitInfo = null; + boolean generateConsistentSplits = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS); + LOG.info("GenerateConsistentSplitsInHive=" + generateConsistentSplits); String realInputFormatName = conf.get("mapred.input.format.class"); boolean groupingEnabled = userPayloadProto.getGroupingEnabled(); if (groupingEnabled) { @@ -123,6 +129,8 @@ public class HiveSplitGenerator extends TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves)); + // Sort the splits, so that subsequent grouping is consistent. + Arrays.sort(splits, new InputSplitComparator()); LOG.info("Number of input splits: " + splits.length + ". " + availableSlots + " available slots, " + waves + " waves. Input format is: " + realInputFormatName); @@ -132,7 +140,8 @@ public class HiveSplitGenerator extends InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]); LOG.info("Number of grouped splits: " + flatSplits.length); - List<TaskLocationHint> locationHints = splitGrouper.createTaskLocationHints(flatSplits); + List<TaskLocationHint> locationHints = + splitGrouper.createTaskLocationHints(flatSplits, generateConsistentSplits); Utilities.clearWork(jobConf); @@ -195,4 +204,47 @@ public class HiveSplitGenerator extends pruner.addEvent(e); } } + + // Descending sort based on split size| Followed by file name. Followed by startPosition. + private static class InputSplitComparator implements Comparator<InputSplit> { + @Override + public int compare(InputSplit o1, InputSplit o2) { + try { + long len1 = o1.getLength(); + long len2 = o2.getLength(); + if (len1 < len2) { + return 1; + } else if (len1 == len2) { + // If the same size. Sort on file name followed by startPosition. + if (o1 instanceof FileSplit && o2 instanceof FileSplit) { + FileSplit fs1 = (FileSplit) o1; + FileSplit fs2 = (FileSplit) o2; + if (fs1.getPath() != null && fs2.getPath() != null) { + int pathComp = (fs1.getPath().compareTo(fs2.getPath())); + if (pathComp == 0) { + // Compare start Position + long startPos1 = fs1.getStart(); + long startPos2 = fs2.getStart(); + if (startPos1 > startPos1) { + return 1; + } else if (startPos1 < startPos2) { + return -1; + } else { + return 0; + } + } else { + return pathComp; + } + } + } + // No further checks if not a file split. Return equality. + return 0; + } else { + return -1; + } + } catch (IOException e) { + throw new RuntimeException("Problem getting input split size", e); + } + } + } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java?rev=1670253&r1=1670252&r2=1670253&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java Tue Mar 31 04:47:08 2015 @@ -24,8 +24,10 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; @@ -105,19 +107,39 @@ public class SplitGrouper { /** * Create task location hints from a set of input splits * @param splits the actual splits + * @param consistentLocations whether to re-order locations for each split, if it's a file split * @return taskLocationHints - 1 per input split specified * @throws IOException */ - public List<TaskLocationHint> createTaskLocationHints(InputSplit[] splits) throws IOException { + public List<TaskLocationHint> createTaskLocationHints(InputSplit[] splits, boolean consistentLocations) throws IOException { List<TaskLocationHint> locationHints = Lists.newArrayListWithCapacity(splits.length); for (InputSplit split : splits) { String rack = (split instanceof TezGroupedSplit) ? ((TezGroupedSplit) split).getRack() : null; if (rack == null) { - if (split.getLocations() != null) { - locationHints.add(TaskLocationHint.createTaskLocationHint(new HashSet<String>(Arrays.asList(split - .getLocations())), null)); + String [] locations = split.getLocations(); + if (locations != null && locations.length > 0) { + // Worthwhile only if more than 1 split, consistentGroupingEnabled and is a FileSplit + if (consistentLocations && locations.length > 1 && split instanceof FileSplit) { + Arrays.sort(locations); + FileSplit fileSplit = (FileSplit) split; + Path path = fileSplit.getPath(); + long startLocation = fileSplit.getStart(); + int hashCode = Objects.hash(path, startLocation); + int startIndex = hashCode % locations.length; + LinkedHashSet<String> locationSet = new LinkedHashSet<>(locations.length); + // Set up the locations starting from startIndex, and wrapping around the sorted array. + for (int i = 0 ; i < locations.length ; i++) { + int index = (startIndex + i) % locations.length; + locationSet.add(locations[index]); + } + locationHints.add(TaskLocationHint.createTaskLocationHint(locationSet, null)); + } else { + locationHints.add(TaskLocationHint + .createTaskLocationHint(new LinkedHashSet<String>(Arrays.asList(split + .getLocations())), null)); + } } else { locationHints.add(TaskLocationHint.createTaskLocationHint(null, null)); }