Author: sseth
Date: Tue Mar 31 04:47:08 2015
New Revision: 1670253

URL: http://svn.apache.org/r1670253
Log:
HIVE-10104. LLAP: Generate consistent splits and locations for the same split 
across jobs. (Siddharth Seth)

Modified:
    hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
    
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
    
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
    
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java

Modified: 
hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1670253&r1=1670252&r2=1670253&view=diff
==============================================================================
--- 
hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
(original)
+++ 
hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
Tue Mar 31 04:47:08 2015
@@ -1941,6 +1941,8 @@ public class HiveConf extends Configurat
         "Whether to send the query plan via local resource or RPC"),
     HIVE_AM_SPLIT_GENERATION("hive.compute.splits.in.am", true,
         "Whether to generate the splits locally or in the AM (tez only)"),
+    
HIVE_TEZ_GENERATE_CONSISTENT_SPLITS("hive.tez.input.generate.consistent.splits",
 true, "Whether to generate consisten split" +
+        "locations when generating splits in the AM"),
 
     HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false, "Enables container 
prewarm for Tez (Hadoop 2 only)"),
     HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10, "Controls 
the number of containers to prewarm for Tez (Hadoop 2 only)"),

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java?rev=1670253&r1=1670252&r2=1670253&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
 Tue Mar 31 04:47:08 2015
@@ -484,7 +484,6 @@ public class LlapTaskSchedulerService ex
       String host = null;
       if (requestedHosts != null && requestedHosts.length > 0) {
         // Pick the first host always. Weak attempt at cache affinity.
-        Arrays.sort(requestedHosts);
         host = requestedHosts[0];
         if (activeHosts.get(host) != null) {
           LOG.info("Selected host: " + host + " from requested hosts: " +

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1670253&r1=1670252&r2=1670253&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
 Tue Mar 31 04:47:08 2015
@@ -474,8 +474,12 @@ public class CustomPartitionVertex exten
 
     LOG.info("Setting vertex parallelism since we have seen all inputs.");
 
+    boolean generateConsistentSplits =  HiveConf.getBoolVar(
+        conf, HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS);
+    LOG.info("GenerateConsistenSplitsInHive=" + generateConsistentSplits);
     context.setVertexParallelism(taskCount, VertexLocationHint.create(grouper
-        .createTaskLocationHints(finalSplits.toArray(new 
InputSplit[finalSplits.size()]))), emMap,
+            .createTaskLocationHints(finalSplits.toArray(new 
InputSplit[finalSplits.size()]),
+                generateConsistentSplits)), emMap,
         rootInputSpecUpdate);
     finalSplits.clear();
   }

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1670253&r1=1670252&r2=1670253&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
 Tue Mar 31 04:47:08 2015
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
@@ -26,10 +28,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -105,6 +109,8 @@ public class HiveSplitGenerator extends
     pruner.prune();
 
     InputSplitInfoMem inputSplitInfo = null;
+    boolean generateConsistentSplits = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS);
+    LOG.info("GenerateConsistentSplitsInHive=" + generateConsistentSplits);
     String realInputFormatName = conf.get("mapred.input.format.class");
     boolean groupingEnabled = userPayloadProto.getGroupingEnabled();
     if (groupingEnabled) {
@@ -123,6 +129,8 @@ public class HiveSplitGenerator extends
               TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
 
       InputSplit[] splits = inputFormat.getSplits(jobConf, (int) 
(availableSlots * waves));
+      // Sort the splits, so that subsequent grouping is consistent.
+      Arrays.sort(splits, new InputSplitComparator());
       LOG.info("Number of input splits: " + splits.length + ". " + 
availableSlots
           + " available slots, " + waves + " waves. Input format is: " + 
realInputFormatName);
 
@@ -132,7 +140,8 @@ public class HiveSplitGenerator extends
       InputSplit[] flatSplits = groupedSplits.values().toArray(new 
InputSplit[0]);
       LOG.info("Number of grouped splits: " + flatSplits.length);
 
-      List<TaskLocationHint> locationHints = 
splitGrouper.createTaskLocationHints(flatSplits);
+      List<TaskLocationHint> locationHints =
+          splitGrouper.createTaskLocationHints(flatSplits, 
generateConsistentSplits);
 
       Utilities.clearWork(jobConf);
 
@@ -195,4 +204,47 @@ public class HiveSplitGenerator extends
       pruner.addEvent(e);
     }
   }
+
+  // Descending sort based on split size| Followed by file name. Followed by 
startPosition.
+  private static class InputSplitComparator implements Comparator<InputSplit> {
+    @Override
+    public int compare(InputSplit o1, InputSplit o2) {
+      try {
+        long len1 = o1.getLength();
+        long len2 = o2.getLength();
+        if (len1 < len2) {
+          return 1;
+        } else if (len1 == len2) {
+          // If the same size. Sort on file name followed by startPosition.
+          if (o1 instanceof FileSplit && o2 instanceof FileSplit) {
+            FileSplit fs1 = (FileSplit) o1;
+            FileSplit fs2 = (FileSplit) o2;
+            if (fs1.getPath() != null && fs2.getPath() != null) {
+              int pathComp = (fs1.getPath().compareTo(fs2.getPath()));
+              if (pathComp == 0) {
+                // Compare start Position
+                long startPos1 = fs1.getStart();
+                long startPos2 = fs2.getStart();
+                if (startPos1 > startPos1) {
+                  return 1;
+                } else if (startPos1 < startPos2) {
+                  return -1;
+                } else {
+                  return 0;
+                }
+              } else {
+                return pathComp;
+              }
+            }
+          }
+          // No further checks if not a file split. Return equality.
+          return 0;
+        } else {
+          return -1;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("Problem getting input split size", e);
+      }
+    }
+  }
 }

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java?rev=1670253&r1=1670252&r2=1670253&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
 Tue Mar 31 04:47:08 2015
@@ -24,8 +24,10 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
@@ -105,19 +107,39 @@ public class SplitGrouper {
   /**
    * Create task location hints from a set of input splits
    * @param splits the actual splits
+   * @param consistentLocations whether to re-order locations for each split, 
if it's a file split
    * @return taskLocationHints - 1 per input split specified
    * @throws IOException
    */
-  public List<TaskLocationHint> createTaskLocationHints(InputSplit[] splits) 
throws IOException {
+  public List<TaskLocationHint> createTaskLocationHints(InputSplit[] splits, 
boolean consistentLocations) throws IOException {
 
     List<TaskLocationHint> locationHints = 
Lists.newArrayListWithCapacity(splits.length);
 
     for (InputSplit split : splits) {
       String rack = (split instanceof TezGroupedSplit) ? ((TezGroupedSplit) 
split).getRack() : null;
       if (rack == null) {
-        if (split.getLocations() != null) {
-          locationHints.add(TaskLocationHint.createTaskLocationHint(new 
HashSet<String>(Arrays.asList(split
-              .getLocations())), null));
+        String [] locations = split.getLocations();
+        if (locations != null && locations.length > 0) {
+          // Worthwhile only if more than 1 split, consistentGroupingEnabled 
and is a FileSplit
+          if (consistentLocations && locations.length > 1 && split instanceof 
FileSplit) {
+            Arrays.sort(locations);
+            FileSplit fileSplit = (FileSplit) split;
+            Path path = fileSplit.getPath();
+            long startLocation = fileSplit.getStart();
+            int hashCode = Objects.hash(path, startLocation);
+            int startIndex = hashCode % locations.length;
+            LinkedHashSet<String> locationSet = new 
LinkedHashSet<>(locations.length);
+            // Set up the locations starting from startIndex, and wrapping 
around the sorted array.
+            for (int i = 0 ; i < locations.length ; i++) {
+              int index = (startIndex + i) % locations.length;
+              locationSet.add(locations[index]);
+            }
+            
locationHints.add(TaskLocationHint.createTaskLocationHint(locationSet, null));
+          } else {
+            locationHints.add(TaskLocationHint
+                .createTaskLocationHint(new 
LinkedHashSet<String>(Arrays.asList(split
+                    .getLocations())), null));
+          }
         } else {
           locationHints.add(TaskLocationHint.createTaskLocationHint(null, 
null));
         }


Reply via email to