Repository: hive
Updated Branches:
  refs/heads/master 20c95c1c0 -> 7cf791472


HIVE-19937: Intern fields in MapWork on deserialization (Sahil Takiar, reviewed 
by Vihang Karajgaonkar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7cf79147
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7cf79147
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7cf79147

Branch: refs/heads/master
Commit: 7cf7914729ceeca34017ad6671a97a1290915e10
Parents: 20c95c1
Author: Sahil Takiar <takiar.sa...@gmail.com>
Authored: Mon Jun 18 15:38:13 2018 -0500
Committer: Sahil Takiar <stak...@cloudera.com>
Committed: Mon Aug 6 11:05:33 2018 +0200

----------------------------------------------------------------------
 .../hive/ql/exec/AbstractMapOperator.java       |  4 +-
 .../apache/hadoop/hive/ql/exec/MapOperator.java | 13 ++---
 .../hive/ql/exec/SerializationUtilities.java    | 52 +++++++++++++++++++-
 .../hive/ql/exec/vector/VectorMapOperator.java  | 12 ++---
 .../hive/ql/io/parquet/ProjectionPusher.java    |  4 +-
 .../org/apache/hadoop/hive/ql/plan/MapWork.java | 17 +++++--
 .../hadoop/hive/ql/plan/PartitionDesc.java      |  2 +-
 7 files changed, 82 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7cf79147/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java
index 0d1c688..c7af991 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java
@@ -96,7 +96,7 @@ public abstract class AbstractMapOperator extends 
Operator<MapWork>
     return path;
   }
 
-  protected String getNominalPath(Path fpath) {
+  protected Path getNominalPath(Path fpath) {
     Path nominal = null;
     boolean schemaless = fpath.toUri().getScheme() == null;
     for (Path onefile : conf.getPathToAliases().keySet()) {
@@ -119,7 +119,7 @@ public abstract class AbstractMapOperator extends 
Operator<MapWork>
     if (nominal == null) {
       throw new IllegalStateException("Invalid input path " + fpath);
     }
-    return nominal.toString();
+    return nominal;
   }
 
   public abstract void initEmptyInputChildren(List<Operator<?>> children, 
Configuration hconf)

http://git-wip-us.apache.org/repos/asf/hive/blob/7cf79147/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index c7350ca..b9986d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -79,8 +79,7 @@ public class MapOperator extends AbstractMapOperator {
   protected transient long logEveryNRows = 0;
 
   // input path --> {operator --> context}
-  private final Map<String, Map<Operator<?>, MapOpCtx>> opCtxMap =
-      new HashMap<String, Map<Operator<?>, MapOpCtx>>();
+  private final Map<Path, Map<Operator<?>, MapOpCtx>> opCtxMap = new 
HashMap<>();
   // child operator --> object inspector (converted OI if it's needed)
   private final Map<Operator<?>, StructObjectInspector> childrenOpToOI =
       new HashMap<Operator<?>, StructObjectInspector>();
@@ -440,10 +439,8 @@ public class MapOperator extends AbstractMapOperator {
           LOG.debug("Adding alias " + alias + " to work list for file "
               + onefile);
         }
-        Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(onefile.toString());
-        if (contexts == null) {
-          opCtxMap.put(onefile.toString(), contexts = new 
LinkedHashMap<Operator<?>, MapOpCtx>());
-        }
+        Map<Operator<?>, MapOpCtx> contexts = opCtxMap.computeIfAbsent(onefile,
+                k -> new LinkedHashMap<>());
         if (contexts.containsKey(op)) {
           continue;
         }
@@ -515,7 +512,7 @@ public class MapOperator extends AbstractMapOperator {
   public void cleanUpInputFileChangedOp() throws HiveException {
     super.cleanUpInputFileChangedOp();
     Path fpath = getExecContext().getCurrentInputPath();
-    String nominalPath = getNominalPath(fpath);
+    Path nominalPath = getNominalPath(fpath);
     Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(nominalPath);
     if (LOG.isInfoEnabled()) {
       StringBuilder builder = new StringBuilder();
@@ -703,7 +700,7 @@ public class MapOperator extends AbstractMapOperator {
 
   public void initializeContexts() {
     Path fpath = getExecContext().getCurrentInputPath();
-    String nominalPath = getNominalPath(fpath);
+    Path nominalPath = getNominalPath(fpath);
     Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(nominalPath);
     currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/7cf79147/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
index e03429b..28550d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
@@ -233,13 +234,14 @@ public class SerializationUtilities {
       kryo.register(Arrays.asList("").getClass(), new 
ArraysAsListSerializer());
       kryo.register(new java.util.ArrayList().subList(0,0).getClass(), new 
ArrayListSubListSerializer());
       kryo.register(CopyOnFirstWriteProperties.class, new 
CopyOnFirstWritePropertiesSerializer());
+      kryo.register(MapWork.class, new MapWorkSerializer(kryo, MapWork.class));
+      kryo.register(PartitionDesc.class, new PartitionDescSerializer(kryo, 
PartitionDesc.class));
 
       ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
           .setFallbackInstantiatorStrategy(
               new StdInstantiatorStrategy());
       removeField(kryo, AbstractOperatorDesc.class, "colExprMap");
       removeField(kryo, AbstractOperatorDesc.class, "statistics");
-      kryo.register(MapWork.class);
       kryo.register(ReduceWork.class);
       kryo.register(TableDesc.class);
       kryo.register(UnionOperator.class);
@@ -541,6 +543,54 @@ public class SerializationUtilities {
   }
 
   /**
+   * We use a custom {@link com.esotericsoftware.kryo.Serializer} for {@link 
MapWork} objects in
+   * order to invoke any string interning code present in the "setter" 
methods. The fields in {@link
+   * MapWork} often store paths that contain duplicate strings, so interning 
them can decrease
+   * memory significantly.
+   */
+  private static class MapWorkSerializer extends FieldSerializer<MapWork> {
+
+    MapWorkSerializer(Kryo kryo, Class type) {
+      super(kryo, type);
+    }
+
+    @Override
+    public MapWork read(Kryo kryo, Input input, Class<MapWork> type) {
+      MapWork mapWork = super.read(kryo, input, type);
+      // The set methods in MapWork intern the any duplicate strings which is 
why we call them
+      // during de-serialization
+      mapWork.setPathToPartitionInfo(mapWork.getPathToPartitionInfo());
+      mapWork.setPathToAliases(mapWork.getPathToAliases());
+      return mapWork;
+    }
+  }
+
+  /**
+   * We use a custom {@link com.esotericsoftware.kryo.Serializer} for {@link 
PartitionDesc} objects
+   * in order to invoke any string interning code present in the "setter" 
methods. {@link
+   * PartitionDesc} objects are usually stored by {@link MapWork} objects and 
contain duplicate info
+   * like input format class names, partition specs, etc.
+   */
+  private static class PartitionDescSerializer extends 
FieldSerializer<PartitionDesc> {
+
+    PartitionDescSerializer(Kryo kryo, Class type) {
+      super(kryo, type);
+    }
+
+    @Override
+    public PartitionDesc read(Kryo kryo, Input input, Class<PartitionDesc> 
type) {
+      PartitionDesc partitionDesc = super.read(kryo, input, type);
+      // The set methods in PartitionDesc intern the any duplicate strings 
which is why we call them
+      // during de-serialization
+      partitionDesc.setBaseFileName(partitionDesc.getBaseFileName());
+      partitionDesc.setPartSpec(partitionDesc.getPartSpec());
+      
partitionDesc.setInputFileFormatClass(partitionDesc.getInputFileFormatClass());
+      
partitionDesc.setOutputFileFormatClass(partitionDesc.getOutputFileFormatClass());
+      return partitionDesc;
+    }
+  }
+
+  /**
    * Serializes the plan.
    *
    * @param plan The plan, such as QueryPlan, MapredWork, etc.

http://git-wip-us.apache.org/repos/asf/hive/blob/7cf79147/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
index bd70991..5a903d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
@@ -92,7 +92,7 @@ public class VectorMapOperator extends AbstractMapOperator {
   /*
    * Overall information on this vectorized Map operation.
    */
-  private transient HashMap<String, VectorPartitionContext> 
fileToPartitionContextMap;
+  private transient HashMap<Path, VectorPartitionContext> 
fileToPartitionContextMap;
 
   private transient Operator<? extends OperatorDesc> oneRootOperator;
 
@@ -555,7 +555,7 @@ public class VectorMapOperator extends AbstractMapOperator {
      * The Vectorizer class enforces that there is only one TableScanOperator, 
so
      * we don't need the more complicated multiple root operator mapping that 
MapOperator has.
      */
-    fileToPartitionContextMap = new HashMap<String, VectorPartitionContext>();
+    fileToPartitionContextMap = new HashMap<>();
 
     // Temporary map so we only create one partition context entry.
     HashMap<PartitionDesc, VectorPartitionContext> partitionContextMap =
@@ -573,7 +573,7 @@ public class VectorMapOperator extends AbstractMapOperator {
         vectorPartitionContext = partitionContextMap.get(partDesc);
       }
 
-      fileToPartitionContextMap.put(path.toString(), vectorPartitionContext);
+      fileToPartitionContextMap.put(path, vectorPartitionContext);
     }
 
     // Create list of one.
@@ -593,7 +593,7 @@ public class VectorMapOperator extends AbstractMapOperator {
 
   public void initializeContexts() throws HiveException {
     Path fpath = getExecContext().getCurrentInputPath();
-    String nominalPath = getNominalPath(fpath);
+    Path nominalPath = getNominalPath(fpath);
     setupPartitionContextVars(nominalPath);
   }
 
@@ -602,7 +602,7 @@ public class VectorMapOperator extends AbstractMapOperator {
   public void cleanUpInputFileChangedOp() throws HiveException {
     super.cleanUpInputFileChangedOp();
     Path fpath = getExecContext().getCurrentInputPath();
-    String nominalPath = getNominalPath(fpath);
+    Path nominalPath = getNominalPath(fpath);
 
     setupPartitionContextVars(nominalPath);
 
@@ -641,7 +641,7 @@ public class VectorMapOperator extends AbstractMapOperator {
   /*
    * Setup the context for reading from the next partition file.
    */
-  private void setupPartitionContextVars(String nominalPath) throws 
HiveException {
+  private void setupPartitionContextVars(Path nominalPath) throws 
HiveException {
 
     currentVectorPartContext = fileToPartitionContextMap.get(nominalPath);
     if (currentVectorPartContext == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/7cf79147/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
index fd6cc86..0444562 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.hadoop.hive.common.StringInternUtils;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,7 +69,8 @@ public class ProjectionPusher {
       pathToPartitionInfo.clear();
       for (final Map.Entry<Path, PartitionDesc> entry : 
mapWork.getPathToPartitionInfo().entrySet()) {
         // key contains scheme (such as pfile://) and we want only the path 
portion fix in HIVE-6366
-        
pathToPartitionInfo.put(Path.getPathWithoutSchemeAndAuthority(entry.getKey()), 
entry.getValue());
+        pathToPartitionInfo.put(StringInternUtils.internUriStringsInPath(
+                Path.getPathWithoutSchemeAndAuthority(entry.getKey())), 
entry.getValue());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/7cf79147/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index a0bd649..e7256cc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -194,6 +194,7 @@ public class MapWork extends BaseWork {
   }
 
   public void addPathToAlias(Path path, ArrayList<String> aliases){
+    StringInternUtils.internUriStringsInPath(path);
     pathToAliases.put(path, aliases);
   }
 
@@ -201,6 +202,7 @@ public class MapWork extends BaseWork {
     ArrayList<String> aliases = pathToAliases.get(path);
     if (aliases == null) {
       aliases = new ArrayList<>();
+      StringInternUtils.internUriStringsInPath(path);
       pathToAliases.put(path, aliases);
     }
     aliases.add(newAlias.intern());
@@ -243,6 +245,9 @@ public class MapWork extends BaseWork {
   }
 
   public void setPathToPartitionInfo(final LinkedHashMap<Path, PartitionDesc> 
pathToPartitionInfo) {
+    for (Path p : pathToPartitionInfo.keySet()) {
+      StringInternUtils.internUriStringsInPath(p);
+    }
     this.pathToPartitionInfo = pathToPartitionInfo;
   }
 
@@ -690,6 +695,10 @@ public class MapWork extends BaseWork {
     return eventSourceColumnTypeMap;
   }
 
+  public void setEventSourceColumnTypeMap(Map<String, List<String>> 
eventSourceColumnTypeMap) {
+    this.eventSourceColumnTypeMap = eventSourceColumnTypeMap;
+ }
+
   public Map<String, List<ExprNodeDesc>> getEventSourcePartKeyExprMap() {
     return eventSourcePartKeyExprMap;
   }
@@ -736,7 +745,7 @@ public class MapWork extends BaseWork {
 
   public void setIncludedBuckets(BitSet includedBuckets) {
     // see comment next to the field
-    this.includedBuckets = includedBuckets.toByteArray();
+    this.includedBuckets = includedBuckets == null ? null : 
includedBuckets.toByteArray();
   }
 
   public void setVectorizedRowBatch(VectorizedRowBatch vectorizedRowBatch) {
@@ -821,7 +830,8 @@ public class MapWork extends BaseWork {
   }
 
   public void setVectorizationEnabledConditionsMet(ArrayList<String> 
vectorizationEnabledConditionsMet) {
-    this.vectorizationEnabledConditionsMet = 
VectorizationCondition.addBooleans(vectorizationEnabledConditionsMet, true);
+    this.vectorizationEnabledConditionsMet = vectorizationEnabledConditionsMet 
== null ? null : VectorizationCondition.addBooleans(
+            vectorizationEnabledConditionsMet, true);
   }
 
   public List<String> getVectorizationEnabledConditionsMet() {
@@ -829,7 +839,8 @@ public class MapWork extends BaseWork {
   }
 
   public void setVectorizationEnabledConditionsNotMet(List<String> 
vectorizationEnabledConditionsNotMet) {
-    this.vectorizationEnabledConditionsNotMet = 
VectorizationCondition.addBooleans(vectorizationEnabledConditionsNotMet, false);
+    this.vectorizationEnabledConditionsNotMet = 
vectorizationEnabledConditionsNotMet == null ? null : 
VectorizationCondition.addBooleans(
+            vectorizationEnabledConditionsNotMet, false);
   }
 
   public List<String> getVectorizationEnabledConditionsNotMet() {

http://git-wip-us.apache.org/repos/asf/hive/blob/7cf79147/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
index 821e428..b226ab7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
@@ -67,7 +67,7 @@ public class PartitionDesc implements Serializable, Cloneable 
{
   private VectorPartitionDesc vectorPartitionDesc;
 
   public void setBaseFileName(String baseFileName) {
-    this.baseFileName = baseFileName.intern();
+    this.baseFileName = StringInternUtils.internIfNotNull(baseFileName);
   }
 
   public PartitionDesc() {

Reply via email to