http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
index 9a3f81c..22b052c 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
@@ -27,11 +27,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.Stack;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
@@ -74,6 +75,8 @@ import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
 import org.apache.hadoop.hive.ql.plan.Statistics.State;
+import org.apache.hadoop.hive.ql.plan.mapper.RuntimeStatsSource;
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
@@ -132,7 +135,9 @@ public class StatsRulesProcFactory {
       try {
         // gather statistics for the first time and the attach it to table 
scan operator
         Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), 
partList, colStatsCached, table, tsop);
-        tsop.setStatistics(stats.clone());
+
+        stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), 
stats, (Operator<?>) tsop);
+        tsop.setStatistics(stats);
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("[0] STATS-" + tsop.toString() + " (" + 
table.getTableName() + "): " +
@@ -144,6 +149,7 @@ public class StatsRulesProcFactory {
       }
       return null;
     }
+
   }
 
   /**
@@ -181,14 +187,15 @@ public class StatsRulesProcFactory {
       if (satisfyPrecondition(parentStats)) {
         // this will take care of mapping between input column names and 
output column names. The
         // returned column stats will have the output column names.
-        List<ColStatistics> colStats = 
StatsUtils.getColStatisticsFromExprMap(conf, parentStats,
-            sop.getColumnExprMap(), sop.getSchema());
+        List<ColStatistics> colStats =
+            StatsUtils.getColStatisticsFromExprMap(conf, parentStats, 
sop.getColumnExprMap(), sop.getSchema());
         stats.setColumnStats(colStats);
         // in case of select(*) the data size does not change
         if (!sop.getConf().isSelectStar() && 
!sop.getConf().isSelStarNoCompute()) {
           long dataSize = 
StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats);
           stats.setDataSize(dataSize);
         }
+        stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), 
stats, (Operator<?>) sop);
         sop.setStatistics(stats);
 
         if (LOG.isDebugEnabled()) {
@@ -196,7 +203,8 @@ public class StatsRulesProcFactory {
         }
       } else {
         if (parentStats != null) {
-          sop.setStatistics(parentStats.clone());
+          stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), 
stats, (Operator<?>) sop);
+          sop.setStatistics(stats);
 
           if (LOG.isDebugEnabled()) {
             LOG.debug("[1] STATS-" + sop.toString() + ": " + 
parentStats.extendedToString());
@@ -299,7 +307,10 @@ public class StatsRulesProcFactory {
             LOG.debug("[1] STATS-" + fop.toString() + ": " + 
st.extendedToString());
           }
         }
+
+        st = applyRuntimeStats(aspCtx.getParseContext().getContext(), st, 
(Operator<?>) fop);
         fop.setStatistics(st);
+
         aspCtx.setAndExprStats(null);
       }
       return null;
@@ -1249,6 +1260,7 @@ public class StatsRulesProcFactory {
         }
       }
 
+      stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, 
(Operator<?>) gop);
       gop.setStatistics(stats);
 
       if (LOG.isDebugEnabled() && stats != null) {
@@ -1576,6 +1588,7 @@ public class StatsRulesProcFactory {
           }
         }
 
+        stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), 
stats, jop);
         jop.setStatistics(stats);
 
         if (LOG.isDebugEnabled()) {
@@ -1665,6 +1678,7 @@ public class StatsRulesProcFactory {
           }
         }
 
+        wcStats = applyRuntimeStats(aspCtx.getParseContext().getContext(), 
wcStats, jop);
         jop.setStatistics(wcStats);
 
         if (LOG.isDebugEnabled()) {
@@ -2215,6 +2229,7 @@ public class StatsRulesProcFactory {
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
+      AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
       LimitOperator lop = (LimitOperator) nd;
       Operator<? extends OperatorDesc> parent = 
lop.getParentOperators().get(0);
       Statistics parentStats = parent.getStatistics();
@@ -2232,6 +2247,7 @@ public class StatsRulesProcFactory {
         if (limit <= parentStats.getNumRows()) {
           updateStats(stats, limit, true, lop);
         }
+        stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), 
stats, (Operator<?>) lop);
         lop.setStatistics(stats);
 
         if (LOG.isDebugEnabled()) {
@@ -2243,7 +2259,8 @@ public class StatsRulesProcFactory {
           // in the absence of column statistics, compute data size based on
           // based on average row size
           limit = StatsUtils.getMaxIfOverflow(limit);
-          Statistics wcStats = parentStats.scaleToRowCount(limit);
+          Statistics wcStats = parentStats.scaleToRowCount(limit, true);
+          wcStats = applyRuntimeStats(aspCtx.getParseContext().getContext(), 
wcStats, (Operator<?>) lop);
           lop.setStatistics(wcStats);
           if (LOG.isDebugEnabled()) {
             LOG.debug("[1] STATS-" + lop.toString() + ": " + 
wcStats.extendedToString());
@@ -2265,8 +2282,7 @@ public class StatsRulesProcFactory {
   public static class ReduceSinkStatsRule extends DefaultStatsRule implements 
NodeProcessor {
 
     @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx 
procCtx, Object... nodeOutputs) throws SemanticException {
       ReduceSinkOperator rop = (ReduceSinkOperator) nd;
       Operator<? extends OperatorDesc> parent = 
rop.getParentOperators().get(0);
       Statistics parentStats = parent.getStatistics();
@@ -2283,8 +2299,7 @@ public class StatsRulesProcFactory {
             String prefixedKey = Utilities.ReduceField.KEY.toString() + "." + 
key;
             ExprNodeDesc end = colExprMap.get(prefixedKey);
             if (end != null) {
-              ColStatistics cs = StatsUtils
-                  .getColStatisticsFromExpression(conf, parentStats, end);
+              ColStatistics cs = 
StatsUtils.getColStatisticsFromExpression(conf, parentStats, end);
               if (cs != null) {
                 cs.setColumnName(prefixedKey);
                 colStats.add(cs);
@@ -2296,8 +2311,7 @@ public class StatsRulesProcFactory {
             String prefixedVal = Utilities.ReduceField.VALUE.toString() + "." 
+ val;
             ExprNodeDesc end = colExprMap.get(prefixedVal);
             if (end != null) {
-              ColStatistics cs = StatsUtils
-                  .getColStatisticsFromExpression(conf, parentStats, end);
+              ColStatistics cs = 
StatsUtils.getColStatisticsFromExpression(conf, parentStats, end);
               if (cs != null) {
                 cs.setColumnName(prefixedVal);
                 colStats.add(cs);
@@ -2307,6 +2321,8 @@ public class StatsRulesProcFactory {
 
           outStats.setColumnStats(colStats);
         }
+
+        outStats = applyRuntimeStats(aspCtx.getParseContext().getContext(), 
outStats, (Operator<?>) rop);
         rop.setStatistics(outStats);
         if (LOG.isDebugEnabled()) {
           LOG.debug("[0] STATS-" + rop.toString() + ": " + 
outStats.extendedToString());
@@ -2355,6 +2371,7 @@ public class StatsRulesProcFactory {
                 LOG.debug("[0] STATS-" + op.toString() + ": " + 
stats.extendedToString());
               }
             }
+            stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), 
stats, op);
             op.getConf().setStatistics(stats);
           }
         }
@@ -2473,4 +2490,24 @@ public class StatsRulesProcFactory {
     return stats != null && 
stats.getBasicStatsState().equals(Statistics.State.COMPLETE)
         && !stats.getColumnStatsState().equals(Statistics.State.NONE);
   }
+
+
+  private static Statistics applyRuntimeStats(Context context, Statistics 
stats, Operator<?> op) {
+    if (!context.getRuntimeStatsSource().isPresent()) {
+      return stats;
+    }
+    RuntimeStatsSource rss = context.getRuntimeStatsSource().get();
+
+    Optional<OperatorStats> os = rss.lookup(op);
+
+    if (!os.isPresent()) {
+      return stats;
+    }
+    LOG.debug("using runtime stats for {}; {}", op, os.get());
+    Statistics outStats = stats.clone();
+    outStats = outStats.scaleToRowCount(os.get().getOutputRecords(), false);
+    outStats.setRuntimeStats(true);
+    return outStats;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
index 78cbf25..a1ec96c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
@@ -303,6 +303,7 @@ KW_COMPACTIONS: 'COMPACTIONS';
 KW_TRANSACTIONS: 'TRANSACTIONS';
 KW_REWRITE : 'REWRITE';
 KW_AUTHORIZATION: 'AUTHORIZATION';
+KW_REOPTIMIZATION: 'REOPTIMIZATION';
 KW_CONF: 'CONF';
 KW_VALUES: 'VALUES';
 KW_RELOAD: 'RELOAD';

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index 0c6aece..3abc752 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -774,14 +774,21 @@ explainStatement
        : KW_EXPLAIN (
            explainOption* execStatement -> ^(TOK_EXPLAIN execStatement 
explainOption*)
         |
-        KW_REWRITE queryStatementExpression -> ^(TOK_EXPLAIN_SQ_REWRITE 
queryStatementExpression))
+        KW_REWRITE queryStatementExpression -> ^(TOK_EXPLAIN_SQ_REWRITE 
queryStatementExpression)
+      )
        ;
 
 explainOption
 @init { msgs.push("explain option"); }
 @after { msgs.pop(); }
-    : 
KW_EXTENDED|KW_FORMATTED|KW_DEPENDENCY|KW_LOGICAL|KW_AUTHORIZATION|KW_ANALYZE|
-      (KW_VECTORIZATION vectorizationOnly? vectorizatonDetail?)
+    : KW_EXTENDED
+    | KW_FORMATTED
+    | KW_DEPENDENCY
+    | KW_LOGICAL
+    | KW_AUTHORIZATION
+    | KW_ANALYZE
+    | KW_REOPTIMIZATION
+    | (KW_VECTORIZATION vectorizationOnly? vectorizatonDetail?)
     ;
 
 vectorizationOnly

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
index 35f9edf..2bba33f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
@@ -832,6 +832,7 @@ nonReserved
     | KW_ZONE
     | KW_TIMESTAMPTZ
     | KW_DEFAULT
+    | KW_REOPTIMIZATION
     | KW_RESOURCE | KW_PLAN | KW_PLANS | KW_QUERY_PARALLELISM | KW_ACTIVATE | 
KW_MOVE | KW_DO
     | KW_POOL | KW_ALLOC_FRACTION | KW_SCHEDULING_POLICY | KW_PATH | 
KW_MAPPING | KW_WORKLOAD | KW_MANAGEMENT | KW_ACTIVE | KW_UNMANAGED
 

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
index 714cf39..e04a783 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
@@ -22,12 +22,9 @@ package org.apache.hadoop.hive.ql.plan;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.PTFUtils;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
-import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
 
-public class AbstractOperatorDesc implements OperatorDesc {
+public abstract class AbstractOperatorDesc implements OperatorDesc {
 
   protected boolean vectorMode = false;
 
@@ -125,10 +122,12 @@ public class AbstractOperatorDesc implements OperatorDesc 
{
     this.memAvailable = memoryAvailble;
   }
 
+  @Override
   public String getRuntimeStatsTmpDir() {
     return runtimeStatsTmpDir;
   }
 
+  @Override
   public void setRuntimeStatsTmpDir(String runtimeStatsTmpDir) {
     this.runtimeStatsTmpDir = runtimeStatsTmpDir;
   }
@@ -161,4 +160,9 @@ public class AbstractOperatorDesc implements OperatorDesc {
     this.colExprMap = colExprMap;
   }
 
+  @Override
+  public void fillSignature(Map<String, Object> ret) {
+    throw new RuntimeException();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java
index 7d5be6b..a68371a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java
@@ -19,14 +19,15 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Objects;
 
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
 import org.apache.hadoop.io.DataOutputBuffer;
 
 
+
 @SuppressWarnings("serial")
 @Explain(displayName = "Application Master Event Operator")
 public class AppMasterEventDesc extends AbstractOperatorDesc {
@@ -36,11 +37,13 @@ public class AppMasterEventDesc extends 
AbstractOperatorDesc {
   private String inputName;
 
   @Explain(displayName = "Target Vertex")
+  @Signature
   public String getVertexName() {
     return vertexName;
   }
 
   @Explain(displayName = "Target Input")
+  @Signature
   public String getInputName() {
     return inputName;
   }
@@ -53,6 +56,7 @@ public class AppMasterEventDesc extends AbstractOperatorDesc {
     this.vertexName = vertexName;
   }
 
+  @Signature
   public TableDesc getTable() {
     return table;
   }
@@ -98,4 +102,5 @@ public class AppMasterEventDesc extends AbstractOperatorDesc 
{
     }
     return false;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
index 7332693..5a81add 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
+
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
 
@@ -38,6 +40,7 @@ public class CommonMergeJoinDesc extends MapJoinDesc 
implements Serializable {
     this.mapJoinConversionPos = mapJoinConversionPos;
   }
 
+  @Signature
   public int getNumBuckets() {
     return numBuckets;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java
index 5d3fdb8..32c6c6f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java
@@ -23,6 +23,7 @@ import java.util.Objects;
 
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
@@ -67,6 +68,7 @@ public class DynamicPruningEventDesc extends 
AppMasterEventDesc {
     return targetColumnName + " (" + targetColumnType + ")";
   }
 
+  @Signature
   public String getTargetColumnName() {
     return targetColumnName;
   }
@@ -75,6 +77,7 @@ public class DynamicPruningEventDesc extends 
AppMasterEventDesc {
     this.targetColumnName = columnName;
   }
 
+  @Signature
   public String getTargetColumnType() {
     return targetColumnType;
   }
@@ -94,6 +97,7 @@ public class DynamicPruningEventDesc extends 
AppMasterEventDesc {
   }
 
   @Explain(displayName = "Partition key expr")
+  @Signature
   public String getPartKeyString() {
     return this.partKey.getExprString();
   }
@@ -112,4 +116,5 @@ public class DynamicPruningEventDesc extends 
AppMasterEventDesc {
     }
     return false;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index ce61fc5..e15a49f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -25,6 +25,7 @@ import java.util.Objects;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
 
@@ -191,6 +192,7 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
   }
 
   @Explain(displayName = "directory", explainLevels = { Level.EXTENDED })
+  @Signature
   public Path getDirName() {
     return dirName;
   }
@@ -214,6 +216,7 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
   }
 
   @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, 
Level.EXTENDED })
+  @Signature
   public TableDesc getTableInfo() {
     return tableInfo;
   }
@@ -223,6 +226,7 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
   }
 
   @Explain(displayName = "compressed")
+  @Signature
   public boolean getCompressed() {
     return compressed;
   }
@@ -232,6 +236,8 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
   }
 
   @Explain(displayName = "GlobalTableId", explainLevels = { Level.EXTENDED })
+  @Signature
+
   public int getDestTableId() {
     return destTableId;
   }
@@ -260,6 +266,8 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
    * @return the multiFileSpray
    */
   @Explain(displayName = "MultiFileSpray", explainLevels = { Level.EXTENDED })
+  @Signature
+
   public boolean isMultiFileSpray() {
     return multiFileSpray;
   }
@@ -311,6 +319,8 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
    * @return the totalFiles
    */
   @Explain(displayName = "TotalFiles", explainLevels = { Level.EXTENDED })
+  @Signature
+
   public int getTotalFiles() {
     return totalFiles;
   }
@@ -340,6 +350,8 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
    * @return the numFiles
    */
   @Explain(displayName = "NumFilesPerFileSink", explainLevels = { 
Level.EXTENDED })
+  @Signature
+
   public int getNumFiles() {
     return numFiles;
   }
@@ -364,6 +376,7 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
   }
 
   @Explain(displayName = "Static Partition Specification", explainLevels = { 
Level.EXTENDED })
+  @Signature
   public String getStaticSpec() {
     return staticSpec;
   }
@@ -374,6 +387,8 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
 
   @Override
   @Explain(displayName = "GatherStats", explainLevels = { Level.EXTENDED })
+  @Signature
+
   public boolean isGatherStats() {
     return gatherStats;
   }
@@ -391,6 +406,9 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
    */
   @Override
   @Explain(displayName = "Stats Publishing Key Prefix", explainLevels = { 
Level.EXTENDED })
+  // FIXME: including this in the signature will almost certenly differ even 
if the operator is doing the same
+  // there might be conflicting usages of logicalCompare?
+  @Signature
   public String getStatsAggPrefix() {
     // dirName uniquely identifies destination directory of a FileSinkOperator.
     // If more than one FileSinkOperator write to the same partition, this 
dirName

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
index d59834c..fc7327a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
 
@@ -109,6 +110,7 @@ public class FilterDesc extends AbstractOperatorDesc {
     this.sampleDescr = sampleDescr;
   }
 
+  @Signature
   public String getPredicateString() {
     return PlanUtils.getExprListString(Arrays.asList(predicate));
   }
@@ -137,6 +139,7 @@ public class FilterDesc extends AbstractOperatorDesc {
   }
 
   @Explain(displayName = "isSamplingPred", explainLevels = { Level.EXTENDED })
+  @Signature
   public boolean getIsSamplingPred() {
     return isSamplingPred;
   }
@@ -154,6 +157,7 @@ public class FilterDesc extends AbstractOperatorDesc {
   }
 
   @Explain(displayName = "sampleDesc", explainLevels = { Level.EXTENDED })
+  @Signature
   public String getSampleDescExpr() {
     return sampleDescr == null ? null : sampleDescr.toString();
   }
@@ -234,4 +238,5 @@ public class FilterDesc extends AbstractOperatorDesc {
     }
     return false;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
index 86cc77d..31237c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
@@ -25,16 +25,12 @@ import java.util.Objects;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationDesc;
-import 
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.hive.ql.udf.UDFType;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hive.common.util.AnnotationUtils;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 
 
 /**
@@ -129,6 +125,7 @@ public class GroupByDesc extends AbstractOperatorDesc {
   }
 
   @Explain(displayName = "mode")
+  @Signature
   public String getModeString() {
     switch (mode) {
     case COMPLETE:
@@ -155,6 +152,7 @@ public class GroupByDesc extends AbstractOperatorDesc {
   }
 
   @Explain(displayName = "keys")
+  @Signature
   public String getKeyString() {
     return PlanUtils.getExprListString(keys);
   }
@@ -173,6 +171,7 @@ public class GroupByDesc extends AbstractOperatorDesc {
   }
 
   @Explain(displayName = "outputColumnNames")
+  @Signature
   public ArrayList<java.lang.String> getOutputColumnNames() {
     return outputColumnNames;
   }
@@ -183,6 +182,7 @@ public class GroupByDesc extends AbstractOperatorDesc {
   }
 
   @Explain(displayName = "pruneGroupingSetId", displayOnlyOnTrue = true)
+  @Signature
   public boolean pruneGroupingSetId() {
     return groupingSetPosition >= 0 &&
         outputColumnNames.size() != keys.size() + aggregators.size();
@@ -210,6 +210,7 @@ public class GroupByDesc extends AbstractOperatorDesc {
   }
 
   @Explain(displayName = "aggregations", explainLevels = { Level.USER, 
Level.DEFAULT, Level.EXTENDED })
+  @Signature
   public List<String> getAggregatorStrings() {
     List<String> res = new ArrayList<String>();
     for (AggregationDesc agg: aggregators) {
@@ -235,6 +236,7 @@ public class GroupByDesc extends AbstractOperatorDesc {
   }
 
   @Explain(displayName = "bucketGroup", displayOnlyOnTrue = true)
+  @Signature
   public boolean getBucketGroup() {
     return bucketGroup;
   }
@@ -424,4 +426,5 @@ public class GroupByDesc extends AbstractOperatorDesc {
     return false;
   }
 
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
index 9c651ab..a61a47e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Objects;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
 
@@ -289,6 +290,7 @@ public class HashTableSinkDesc extends JoinDesc implements 
Serializable {
 
   @Override
   @Explain(displayName = "filter mappings", explainLevels = { Level.EXTENDED })
+  @Signature
   public Map<Integer, String> getFilterMapString() {
     return toCompactString(filterMap);
   }
@@ -304,6 +306,7 @@ public class HashTableSinkDesc extends JoinDesc implements 
Serializable {
   /**
    * @return the keys in string form
    */
+  @Override
   @Explain(displayName = "keys")
   public Map<Byte, String> getKeysString() {
     Map<Byte, String> keyMap = new LinkedHashMap<Byte, String>();
@@ -313,6 +316,7 @@ public class HashTableSinkDesc extends JoinDesc implements 
Serializable {
     return keyMap;
   }
 
+  @Override
   @Explain(displayName = "keys", explainLevels = { Level.USER })
   public Map<Byte, String> getUserLevelExplainKeysString() {
     Map<Byte, String> keyMap = new LinkedHashMap<Byte, String>();
@@ -399,4 +403,5 @@ public class HashTableSinkDesc extends JoinDesc implements 
Serializable {
     }
     return false;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java
index 6dcf05a..ea22131 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Join conditions Descriptor implementation.
- * 
+ *
  */
 public class JoinCondDesc implements Serializable {
   private static final long serialVersionUID = 1L;
@@ -153,7 +153,7 @@ public class JoinCondDesc implements Serializable {
 
   @Explain(explainLevels = { Level.USER })
   public String getUserLevelJoinCondString() {
-    JSONObject join = new JSONObject(new LinkedHashMap());
+    JSONObject join = new JSONObject(new LinkedHashMap<>());
     try {
       switch (type) {
       case JoinDesc.INNER_JOIN:
@@ -200,4 +200,6 @@ public class JoinCondDesc implements Serializable {
     }
     return true;
   }
+
+  // XXX: is hashCode missing here?
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
index bd45c75..5b7f4c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
@@ -30,6 +30,7 @@ import java.util.Objects;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
@@ -229,6 +230,7 @@ public class JoinDesc extends AbstractOperatorDesc {
    * @return the keys in string form
    */
   @Explain(displayName = "keys")
+  @Signature
   public Map<Byte, String> getKeysString() {
     if (joinKeys == null) {
       return null;
@@ -266,6 +268,7 @@ public class JoinDesc extends AbstractOperatorDesc {
    * @return Map from alias to filters on the alias.
    */
   @Explain(displayName = "filter predicates")
+  @Signature
   public Map<Byte, String> getFiltersStringMap() {
     if (getFilters() == null || getFilters().size() == 0) {
       return null;
@@ -342,6 +345,7 @@ public class JoinDesc extends AbstractOperatorDesc {
   }
 
   @Explain(displayName = "outputColumnNames")
+  @Signature
   public List<String> getOutputColumnNames() {
     return outputColumnNames;
   }
@@ -365,6 +369,7 @@ public class JoinDesc extends AbstractOperatorDesc {
   }
 
   @Explain(displayName = "condition map", explainLevels = { Level.USER, 
Level.DEFAULT, Level.EXTENDED })
+  @Signature
   public List<JoinCondDesc> getCondsList() {
     if (conds == null) {
       return null;
@@ -425,6 +430,7 @@ public class JoinDesc extends AbstractOperatorDesc {
   }
 
   @Explain(displayName = "handleSkewJoin", displayOnlyOnTrue = true)
+  @Signature
   public boolean getHandleSkewJoin() {
     return handleSkewJoin;
   }
@@ -524,6 +530,7 @@ public class JoinDesc extends AbstractOperatorDesc {
   }
 
   @Explain(displayName = "nullSafes")
+  @Signature
   public String getNullSafeString() {
     if (nullsafes == null) {
       return null;
@@ -740,4 +747,5 @@ public class JoinDesc extends AbstractOperatorDesc {
     }
     return false;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java
index 3837a49..85a4683 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.plan;
 import java.util.ArrayList;
 import java.util.Objects;
 
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
 
@@ -49,6 +50,7 @@ public class LateralViewJoinDesc extends AbstractOperatorDesc 
{
   }
 
   @Explain(displayName = "outputColumnNames")
+  @Signature
   public ArrayList<String> getOutputInternalColNames() {
     return outputInternalColNames;
   }
@@ -74,4 +76,5 @@ public class LateralViewJoinDesc extends AbstractOperatorDesc 
{
     }
     return false;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java
index ce53fea..698af94 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
 
@@ -58,6 +59,7 @@ public class LimitDesc extends AbstractOperatorDesc {
     this.offset = offset;
   }
 
+  @Signature
   @Explain(displayName = "Number of rows", explainLevels = { Level.USER, 
Level.DEFAULT, Level.EXTENDED })
   public int getLimit() {
     return limit;
@@ -100,4 +102,5 @@ public class LimitDesc extends AbstractOperatorDesc {
     }
     return false;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
index cf4ab60..91ea159 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
@@ -33,6 +33,7 @@ import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
 import 
org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType;
@@ -139,6 +140,7 @@ public class MapJoinDesc extends JoinDesc implements 
Serializable {
   }
 
   @Explain(displayName = "input vertices", explainLevels = { Level.USER, 
Level.DEFAULT, Level.EXTENDED })
+  @Signature
   public Map<Integer, String> getParentToInput() {
     return parentToInput;
   }
@@ -156,6 +158,7 @@ public class MapJoinDesc extends JoinDesc implements 
Serializable {
   }
 
   @Explain(displayName = "Estimated key counts", explainLevels = { 
Level.EXTENDED })
+  @Signature
   public String getKeyCountsExplainDesc() {
     StringBuilder result = null;
     for (Map.Entry<Integer, Long> entry : parentKeyCounts.entrySet()) {
@@ -250,6 +253,7 @@ public class MapJoinDesc extends JoinDesc implements 
Serializable {
    * @return the position of the big table not in memory
    */
   @Explain(displayName = "Position of Big Table", explainLevels = { 
Level.EXTENDED })
+  @Signature
   public int getPosBigTable() {
     return posBigTable;
   }
@@ -340,6 +344,7 @@ public class MapJoinDesc extends JoinDesc implements 
Serializable {
   }
 
   @Explain(displayName = "BucketMapJoin", explainLevels = { Level.USER, 
Level.EXTENDED }, displayOnlyOnTrue = true)
+  @Signature
   public boolean isBucketMapJoin() {
     return isBucketMapJoin;
   }
@@ -607,4 +612,5 @@ public class MapJoinDesc extends JoinDesc implements 
Serializable {
     }
     return false;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
index 870b4d9..e8a5827 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
@@ -34,7 +34,10 @@ public interface OperatorDesc extends Serializable, 
Cloneable {
   public void setMaxMemoryAvailable(long memoryAvailble);
   public String getRuntimeStatsTmpDir();
   public void setRuntimeStatsTmpDir(String runtimeStatsTmpDir);
+
   boolean isSame(OperatorDesc other);
   public Map<String, ExprNodeDesc> getColumnExprMap();
   public void setColumnExprMap(Map<String, ExprNodeDesc> colExprMap);
+
+  void fillSignature(Map<String, Object> ret);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
index bf24ff8..f2955af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
@@ -27,6 +27,7 @@ import java.util.Objects;
 import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
 import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc.ReduceSinkKeyType;
@@ -97,7 +98,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
   private float topNMemoryUsage = -1;
   private boolean mapGroupBy;  // for group-by, values with same key on top-K 
should be forwarded
   //flag used to control how TopN handled for PTF/Windowing partitions.
-  private boolean isPTFReduceSink = false; 
+  private boolean isPTFReduceSink = false;
   private boolean skipTag; // Skip writing tags when feeding into mapjoin 
hashtable
   private boolean forwarding; // Whether this RS can forward records directly 
instead of shuffling/sorting
 
@@ -206,6 +207,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
     return PlanUtils.getExprListString(keyCols);
   }
 
+  @Signature
   public java.util.ArrayList<ExprNodeDesc> getKeyCols() {
     return keyCols;
   }
@@ -227,6 +229,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
     return PlanUtils.getExprListString(valueCols);
   }
 
+  @Signature
   public java.util.ArrayList<ExprNodeDesc> getValueCols() {
     return valueCols;
   }
@@ -245,6 +248,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
     return PlanUtils.getExprListString(partitionCols, true);
   }
 
+  @Signature
   public java.util.ArrayList<ExprNodeDesc> getPartitionCols() {
     return partitionCols;
   }
@@ -261,6 +265,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
     return false;
   }
 
+  @Signature
   @Explain(displayName = "tag", explainLevels = { Level.EXTENDED })
   public int getTag() {
     return tag;
@@ -270,6 +275,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
     this.tag = tag;
   }
 
+  @Signature
   public int getTopN() {
     return topN;
   }
@@ -349,6 +355,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
    *         of the same length as key columns, that consists of only "+"
    *         (ascending order) and "-" (descending order).
    */
+  @Signature
   @Explain(displayName = "sort order")
   public String getOrder() {
     return keySerializeInfo.getProperties().getProperty(
@@ -437,6 +444,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
     return forwarding;
   }
 
+  @Signature
   @Explain(displayName = "auto parallelism", explainLevels = { Level.EXTENDED 
})
   public final boolean isAutoParallel() {
     return (this.reduceTraits.contains(ReducerTraits.AUTOPARALLEL));
@@ -462,7 +470,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
     // reducers or hash function.
 
     boolean wasUnset = this.reduceTraits.remove(ReducerTraits.UNSET);
-    
+
     if (this.reduceTraits.contains(ReducerTraits.FIXED)) {
       return;
     } else if (traits.contains(ReducerTraits.FIXED)) {
@@ -661,4 +669,5 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
     }
     return false;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java
index 858de98..53fca99 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java
@@ -18,12 +18,13 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
+import java.util.Objects;
+
 import org.apache.hadoop.hive.ql.exec.RecordReader;
 import org.apache.hadoop.hive.ql.exec.RecordWriter;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
-import java.util.Objects;
-
 
 /**
  * ScriptDesc.
@@ -63,6 +64,7 @@ public class ScriptDesc extends AbstractOperatorDesc {
     this.scriptErrInfo = scriptErrInfo;
   }
 
+  @Signature
   @Explain(displayName = "command", explainLevels = { Level.USER, 
Level.DEFAULT, Level.EXTENDED })
   public String getScriptCmd() {
     return scriptCmd;
@@ -72,6 +74,7 @@ public class ScriptDesc extends AbstractOperatorDesc {
     this.scriptCmd = scriptCmd;
   }
 
+  @Signature
   @Explain(displayName = "output info", explainLevels = { Level.USER, 
Level.DEFAULT, Level.EXTENDED })
   public TableDesc getScriptOutputInfo() {
     return scriptOutputInfo;
@@ -154,4 +157,5 @@ public class ScriptDesc extends AbstractOperatorDesc {
     }
     return false;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java
index e38e7e4..51b94fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
 
@@ -72,6 +73,7 @@ public class SelectDesc extends AbstractOperatorDesc {
     return ret;
   }
 
+  @Signature
   @Explain(displayName = "expressions")
   public String getColListString() {
     return PlanUtils.getExprListString(colList);
@@ -86,6 +88,7 @@ public class SelectDesc extends AbstractOperatorDesc {
     this.colList = colList;
   }
 
+  @Signature
   @Explain(displayName = "outputColumnNames")
   public List<java.lang.String> getOutputColumnNames() {
     return outputColumnNames;
@@ -101,6 +104,7 @@ public class SelectDesc extends AbstractOperatorDesc {
     this.outputColumnNames = outputColumnNames;
   }
 
+  @Signature
   @Explain(displayName = "SELECT * ")
   public String explainNoCompute() {
     if (isSelStarNoCompute()) {
@@ -184,4 +188,5 @@ public class SelectDesc extends AbstractOperatorDesc {
     }
     return false;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java
index 0057f0c..fd461ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java
@@ -49,6 +49,7 @@ public class Statistics implements Serializable {
   private State basicStatsState;
   private Map<String, ColStatistics> columnStats;
   private State columnStatsState;
+  private boolean runtimeStats;
 
   public Statistics() {
     this(0, 0);
@@ -119,6 +120,9 @@ public class Statistics implements Serializable {
   @Explain(displayName = "Statistics")
   public String toString() {
     StringBuilder sb = new StringBuilder();
+    if (runtimeStats) {
+      sb.append("(RUNTIME) ");
+    }
     sb.append("Num rows: ");
     sb.append(numRows);
     if (runTimeNumRows >= 0) {
@@ -136,6 +140,9 @@ public class Statistics implements Serializable {
   @Explain(displayName = "Statistics", explainLevels = { Level.USER })
   public String toUserLevelExplainString() {
     StringBuilder sb = new StringBuilder();
+    if (runtimeStats) {
+      sb.append("runtime: ");
+    }
     sb.append("rows=");
     sb.append(numRows);
     if (runTimeNumRows >= 0) {
@@ -153,6 +160,9 @@ public class Statistics implements Serializable {
 
   public String extendedToString() {
     StringBuilder sb = new StringBuilder();
+    if (runtimeStats) {
+      sb.append(" (runtime) ");
+    }
     sb.append(" numRows: ");
     sb.append(numRows);
     sb.append(" dataSize: ");
@@ -179,6 +189,8 @@ public class Statistics implements Serializable {
       }
       clone.setColumnStats(cloneColStats);
     }
+    // TODO: this boolean flag is set only by RS stats annotation at this point
+    //clone.setRuntimeStats(runtimeStats);
     return clone;
   }
 
@@ -300,10 +312,13 @@ public class Statistics implements Serializable {
     this.runTimeNumRows = runTimeNumRows;
   }
 
-  public Statistics scaleToRowCount(long newRowCount) {
+  public Statistics scaleToRowCount(long newRowCount, boolean downScaleOnly) {
     Statistics ret;
     ret = clone();
-    if(numRows == 0 || newRowCount >= numRows) {
+    if (numRows == 0) {
+      return ret;
+    }
+    if (downScaleOnly && newRowCount >= numRows) {
       return ret;
     }
     // FIXME: using real scaling by new/old ration might yield better results?
@@ -311,4 +326,12 @@ public class Statistics implements Serializable {
     ret.dataSize = StatsUtils.safeMult(getAvgRowSize(), newRowCount);
     return ret;
   }
+
+  public boolean isRuntimeStats() {
+    return runtimeStats;
+  }
+
+  public void setRuntimeStats(final boolean runtimeStats) {
+    this.runtimeStats = runtimeStats;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 59968fa..57df7e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -30,6 +30,7 @@ import 
org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.hive.ql.parse.TableSample;
 import org.apache.hadoop.hive.ql.plan.BaseWork.BaseExplainVectorization;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
@@ -156,10 +157,20 @@ public class TableScanDesc extends AbstractOperatorDesc 
implements IStatsGatherD
   }
 
   @Explain(displayName = "alias")
+  // FIXME: this might not needed to be in the signature; but in that case the 
compare shouldn't consider it either!
+  @Signature
   public String getAlias() {
     return alias;
   }
 
+  @Signature
+  public String getPredicateString() {
+    if (filterExpr == null) {
+      return null;
+    }
+    return PlanUtils.getExprListString(Arrays.asList(filterExpr));
+  }
+
   @Explain(displayName = "table", jsonOnly = true)
   public String getTableName() {
     return this.tableName;
@@ -219,6 +230,7 @@ public class TableScanDesc extends AbstractOperatorDesc 
implements IStatsGatherD
     return PlanUtils.getExprListString(Arrays.asList(filterExpr));
   }
 
+  // @Signature // XXX
   public ExprNodeGenericFuncDesc getFilterExpr() {
     return filterExpr;
   }
@@ -296,6 +308,7 @@ public class TableScanDesc extends AbstractOperatorDesc 
implements IStatsGatherD
 
   @Override
   @Explain(displayName = "GatherStats", explainLevels = { Level.EXTENDED })
+  @Signature
   public boolean isGatherStats() {
     return gatherStats;
   }
@@ -347,6 +360,7 @@ public class TableScanDesc extends AbstractOperatorDesc 
implements IStatsGatherD
     this.rowLimit = rowLimit;
   }
 
+  @Signature
   public int getRowLimit() {
     return rowLimit;
   }
@@ -372,6 +386,11 @@ public class TableScanDesc extends AbstractOperatorDesc 
implements IStatsGatherD
     return isMetadataOnly;
   }
 
+  //  @Signature
+  public String getQualifiedTable() {
+    return tableMetadata.getFullyQualifiedName();
+  }
+
   public Table getTableMetadata() {
     return tableMetadata;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java
index cf8e6e5..adcf707 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java
@@ -19,10 +19,12 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
-import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
 import java.util.Objects;
 
+import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
 
 /**
  * All member variables should have a setters and getters of the form 
get<member
@@ -54,6 +56,7 @@ public class UDTFDesc extends AbstractOperatorDesc {
   }
 
   @Explain(displayName = "function name", explainLevels = { Level.USER, 
Level.DEFAULT, Level.EXTENDED })
+  @Signature
   public String getUDTFName() {
     return genericUDTF.toString();
   }
@@ -67,6 +70,7 @@ public class UDTFDesc extends AbstractOperatorDesc {
   }
 
   @Explain(displayName = "outer lateral view")
+  @Signature
   public String isOuterLateralView() {
     return outerLV ? "true" : null;
   }
@@ -80,4 +84,5 @@ public class UDTFDesc extends AbstractOperatorDesc {
     }
     return false;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
new file mode 100644
index 0000000..57762ed
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+public class EmptyStatsSource implements StatsSource {
+
+  @Override
+  public boolean canProvideStatsFor(Class<?> class1) {
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/GroupTransformer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/GroupTransformer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/GroupTransformer.java
new file mode 100644
index 0000000..7b9e99e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/GroupTransformer.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.LinkGroup;
+
+public interface GroupTransformer {
+  void map(LinkGroup group);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java
new file mode 100644
index 0000000..36d7e58
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Enables to connect related objects to eachother.
+ *
+ * Most importantly it aids to connect Operators to OperatorStats and probably 
RelNodes.
+ */
+public class PlanMapper {
+
+  Set<LinkGroup> groups = new HashSet<>();
+  private Map<Object, LinkGroup> objectMap = new HashMap<>();
+
+  public class LinkGroup {
+    Set<Object> members = new HashSet<>();
+
+    public void add(Object o) {
+      members.add(o);
+      objectMap.put(o, this);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> List<T> getAll(Class<T> clazz) {
+      List<T> ret = new ArrayList<>();
+      for (Object m : members) {
+        if (clazz.isInstance(m)) {
+          ret.add((T) m);
+        }
+      }
+      return ret;
+    }
+  }
+
+  public void link(Object o1, Object o2) {
+    LinkGroup g1 = objectMap.get(o1);
+    LinkGroup g2 = objectMap.get(o2);
+    if (g1 != null && g2 != null && g1 != g2) {
+      throw new RuntimeException("equivalence mapping violation");
+    }
+    LinkGroup targetGroup = (g1 != null) ? g1 : (g2 != null ? g2 : new 
LinkGroup());
+    groups.add(targetGroup);
+    targetGroup.add(o1);
+    targetGroup.add(o2);
+  }
+
+  public <T> List<T> getAll(Class<T> clazz) {
+    List<T> ret = new ArrayList<>();
+    for (LinkGroup g : groups) {
+      ret.addAll(g.getAll(clazz));
+    }
+    return ret;
+  }
+
+  public void runMapper(GroupTransformer mapper) {
+    for (LinkGroup equivGroup : groups) {
+      mapper.map(equivGroup);
+    }
+  }
+
+  public <T> List<T> lookupAll(Class<T> clazz, Object key) {
+    LinkGroup group = objectMap.get(key);
+    if (group == null) {
+      throw new NoSuchElementException(Objects.toString(key));
+    }
+    return group.getAll(clazz);
+  }
+
+  public <T> T lookup(Class<T> clazz, Object key) {
+    List<T> all = lookupAll(clazz, key);
+    if (all.size() != 1) {
+      // FIXME: use a different exception type?
+      throw new IllegalArgumentException("Expected match count is 1; but got:" 
+ all);
+    }
+    return all.get(0);
+  }
+
+  @VisibleForTesting
+  public Iterator<LinkGroup> iterateGroups() {
+    return groups.iterator();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapperProcess.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapperProcess.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapperProcess.java
new file mode 100644
index 0000000..424dd79
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapperProcess.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
+import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignatureFactory;
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.LinkGroup;
+
+public class PlanMapperProcess {
+
+  private static class OpTreeSignatureMapper implements GroupTransformer {
+
+    private OpTreeSignatureFactory cache = OpTreeSignatureFactory.newCache();
+
+    @Override
+    public void map(LinkGroup group) {
+      List<Operator> operators= group.getAll(Operator.class);
+      for (Operator op : operators) {
+        group.add(OpTreeSignature.of(op,cache));
+      }
+    }
+  }
+
+  public static void runPostProcess(PlanMapper planMapper) {
+    planMapper.runMapper(new OpTreeSignatureMapper());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/RuntimeStatsSource.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/RuntimeStatsSource.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/RuntimeStatsSource.java
new file mode 100644
index 0000000..21a0678
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/RuntimeStatsSource.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import java.util.Optional;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
+
+public interface RuntimeStatsSource extends StatsSource {
+  public Optional<OperatorStats> lookup(Operator<?> tsop);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
new file mode 100644
index 0000000..6f340b8
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
+
+public class SimpleRuntimeStatsSource implements RuntimeStatsSource {
+
+  private final PlanMapper pm;
+
+
+  public SimpleRuntimeStatsSource(PlanMapper pm) {
+    PlanMapperProcess.runPostProcess(pm);
+    this.pm = pm;
+  }
+
+  @Override
+  public Optional<OperatorStats> lookup(Operator<?> op) {
+    try {
+      OpTreeSignature sig = OpTreeSignature.of(op);
+      List<OperatorStats> v = pm.lookupAll(OperatorStats.class, sig);
+      if (v.size() > 0) {
+        return Optional.of(v.get(0));
+      }
+      return Optional.empty();
+    } catch (NoSuchElementException | IllegalArgumentException iae) {
+      return Optional.empty();
+    }
+  }
+
+  @Override
+  public boolean canProvideStatsFor(Class<?> class1) {
+    if (Operator.class.isAssignableFrom(class1)) {
+      return true;
+    }
+    if (HiveFilter.class.isAssignableFrom(class1)) {
+      return true;
+    }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java
new file mode 100644
index 0000000..a4cb6e9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+public interface StatsSource {
+
+  boolean canProvideStatsFor(Class<?> class1);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
new file mode 100644
index 0000000..2b0d23c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.reexec;
+
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
+
+/**
+ * Defines an interface for re-execution logics.
+ *
+ * FIXME: rethink methods.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface IReExecutionPlugin {
+
+  /**
+   * Called when the {@link Driver} is being initialized
+   *
+   * The plugin may add hooks/etc to tap into the system.
+   */
+  void initialize(Driver driver);
+
+  /**
+   * Called before executing the query.
+   */
+  void beforeExecute(int executionIndex, boolean explainReOptimization);
+
+  /**
+   * The query have failed, does this plugin advises to re-execute it again?
+   */
+  boolean shouldReExecute(int executionNum);
+
+  /**
+   * The plugin should prepare for the re-compilaton of the query.
+   */
+  void prepareToReExecute();
+
+  /**
+   * The query have failed; and have been recompiled - does this plugin 
advises to re-execute it again?
+   */
+  boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, 
PlanMapper newPlanMapper);
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
new file mode 100644
index 0000000..9303171
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.reexec;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.antlr.runtime.tree.Tree;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.QueryDisplay;
+import org.apache.hadoop.hive.ql.QueryInfo;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.HiveParser;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
+import org.apache.hadoop.hive.ql.plan.mapper.SimpleRuntimeStatsSource;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Enables to use re-execution logics.
+ *
+ * Covers the IDriver interface, handles query re-execution; and asks clear 
questions from the underlying re-execution plugins.
+ */
+public class ReExecDriver implements IDriver {
+
+  private class HandleReOptimizationExplain implements 
HiveSemanticAnalyzerHook {
+
+    @Override
+    public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode 
ast) throws SemanticException {
+      if (ast.getType() == HiveParser.TOK_EXPLAIN) {
+        int childCount = ast.getChildCount();
+        for (int i = 1; i < childCount; i++) {
+          if (ast.getChild(i).getType() == HiveParser.KW_REOPTIMIZATION) {
+            explainReOptimization = true;
+            ast.deleteChild(i);
+            break;
+          }
+        }
+        if (explainReOptimization && firstExecution()) {
+          Tree execTree = ast.getChild(0);
+          execTree.setParent(ast.getParent());
+          ast.getParent().setChild(0, execTree);
+          return (ASTNode) execTree;
+        }
+      }
+      return ast;
+    }
+
+    @Override
+    public void postAnalyze(HiveSemanticAnalyzerHookContext context, 
List<Task<? extends Serializable>> rootTasks)
+        throws SemanticException {
+    }
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReExecDriver.class);
+  private boolean explainReOptimization;
+  protected Driver coreDriver;
+  private QueryState queryState;
+  private String currentQuery;
+  private int executionIndex;
+
+  private ArrayList<IReExecutionPlugin> plugins;
+
+  @Override
+  public HiveConf getConf() {
+    return queryState.getConf();
+  }
+
+  public boolean firstExecution() {
+    return executionIndex == 0;
+  }
+
+  public ReExecDriver(QueryState queryState, String userName, QueryInfo 
queryInfo,
+      ArrayList<IReExecutionPlugin> plugins) {
+    this.queryState = queryState;
+    coreDriver = new Driver(queryState, userName, queryInfo, null);
+    coreDriver.getHookRunner().addSemanticAnalyzerHook(new 
HandleReOptimizationExplain());
+    this.plugins = plugins;
+
+    for (IReExecutionPlugin p : plugins) {
+      p.initialize(coreDriver);
+    }
+  }
+
+  @Override
+  public int compile(String string) {
+    return coreDriver.compile(string);
+  }
+
+  @Override
+  public CommandProcessorResponse compileAndRespond(String statement) {
+    currentQuery = statement;
+    return coreDriver.compileAndRespond(statement);
+  }
+
+  @Override
+  public QueryPlan getPlan() {
+    return coreDriver.getPlan();
+  }
+
+  @Override
+  public QueryDisplay getQueryDisplay() {
+    return coreDriver.getQueryDisplay();
+  }
+
+  @Override
+  public void setOperationId(String guid64) {
+    coreDriver.setOperationId(guid64);
+  }
+
+  @Override
+  public CommandProcessorResponse run() {
+    executionIndex = 0;
+    int maxExecutuions = 1 + 
coreDriver.getConf().getIntVar(ConfVars.HIVE_QUERY_MAX_REEXECUTION_COUNT);
+
+
+    while (true) {
+      executionIndex++;
+      for (IReExecutionPlugin p : plugins) {
+        p.beforeExecute(executionIndex, explainReOptimization);
+      }
+      coreDriver.getContext().setExecutionIndex(executionIndex);
+      LOG.info("Execution #{} of query", executionIndex);
+      CommandProcessorResponse cpr = coreDriver.run();
+
+      boolean shouldReExecute = explainReOptimization && executionIndex==1;
+      shouldReExecute |= cpr.getResponseCode() != 0 && shouldReExecute();
+
+      if (executionIndex >= maxExecutuions || !shouldReExecute) {
+        return cpr;
+      }
+      LOG.info("Preparing to re-execute query");
+      prepareToReExecute();
+      PlanMapper oldPlanMapper = coreDriver.getPlanMapper();
+      CommandProcessorResponse compile_resp = 
coreDriver.compileAndRespond(currentQuery);
+      if (compile_resp.failed()) {
+        // FIXME: somehow place pointers that re-execution compilation have 
failed; the query have been successfully compiled before?
+        return compile_resp;
+      }
+
+      PlanMapper newPlanMapper = coreDriver.getPlanMapper();
+      if (!explainReOptimization && 
!shouldReExecuteAfterCompile(oldPlanMapper, newPlanMapper)) {
+        // FIXME: retain old error; or create a new one?
+        return cpr;
+      }
+    }
+  }
+
+  private boolean shouldReExecuteAfterCompile(PlanMapper oldPlanMapper, 
PlanMapper newPlanMapper) {
+    boolean ret = false;
+    for (IReExecutionPlugin p : plugins) {
+      ret |= p.shouldReExecute(executionIndex, oldPlanMapper, newPlanMapper);
+    }
+    return ret;
+  }
+
+  private boolean shouldReExecute() {
+    boolean ret = false;
+    for (IReExecutionPlugin p : plugins) {
+      ret |= p.shouldReExecute(executionIndex);
+    }
+    return ret;
+  }
+
+  @Override
+  public CommandProcessorResponse run(String command) {
+    CommandProcessorResponse r0 = compileAndRespond(command);
+    if (r0.getResponseCode() != 0) {
+      return r0;
+    }
+    return run();
+  }
+
+  protected void prepareToReExecute() {
+    for (IReExecutionPlugin p : plugins) {
+      p.prepareToReExecute();
+    }
+  }
+
+  @Override
+  public boolean getResults(List res) throws IOException {
+    return coreDriver.getResults(res);
+  }
+
+  @Override
+  public void setMaxRows(int maxRows) {
+    coreDriver.setMaxRows(maxRows);
+  }
+
+  @Override
+  public FetchTask getFetchTask() {
+    return coreDriver.getFetchTask();
+  }
+
+  @Override
+  public Schema getSchema() {
+    return coreDriver.getSchema();
+  }
+
+  @Override
+  public boolean isFetchingTable() {
+    return coreDriver.isFetchingTable();
+  }
+
+  @Override
+  public void resetFetch() throws IOException {
+    coreDriver.resetFetch();
+  }
+
+  @Override
+  public void close() {
+    coreDriver.close();
+  }
+
+  @Override
+  public void destroy() {
+    coreDriver.destroy();
+  }
+
+  @Override
+  public final Context getContext() {
+    return coreDriver.getContext();
+  }
+
+  @VisibleForTesting
+  public void setRuntimeStatsSource(SimpleRuntimeStatsSource statsSource) {
+    coreDriver.setRuntimeStatsSource(statsSource);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
new file mode 100644
index 0000000..4ee3c14
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.reexec;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
+
+/**
+ * Re-Executes a query only adding an extra overlay
+ */
+public class ReExecutionOverlayPlugin implements IReExecutionPlugin {
+
+  private Driver driver;
+  private Map<String, String> subtree;
+
+  class LocalHook implements ExecuteWithHookContext {
+
+    @Override
+    public void run(HookContext hookContext) throws Exception {
+      if (hookContext.getHookType() == HookType.ON_FAILURE_HOOK) {
+        Throwable exception = hookContext.getException();
+        if (exception != null) {
+          if (exception.getMessage().contains("Vertex failed,")) {
+            retryPossible = true;
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void initialize(Driver driver) {
+    this.driver = driver;
+    driver.getHookRunner().addOnFailureHook(new LocalHook());
+    HiveConf conf = driver.getConf();
+    subtree = conf.subtree("reexec.overlay");
+  }
+
+  private boolean retryPossible;
+
+  @Override
+  public void prepareToReExecute() {
+    HiveConf conf = driver.getConf();
+    conf.verifyAndSetAll(subtree);
+  }
+
+  @Override
+  public boolean shouldReExecute(int executionNum) {
+    return executionNum == 1 && !subtree.isEmpty() && retryPossible;
+  }
+
+  @Override
+  public boolean shouldReExecute(int executionNum, PlanMapper pm1, PlanMapper 
pm2) {
+    return executionNum == 1;
+  }
+
+  @Override
+  public void beforeExecute(int executionIndex, boolean explainReOptimization) 
{
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
new file mode 100644
index 0000000..7078587
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.reexec;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
+import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
+import org.apache.hadoop.hive.ql.plan.mapper.SimpleRuntimeStatsSource;
+import org.apache.hadoop.hive.ql.stats.OperatorStatsReaderHook;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class ReOptimizePlugin implements IReExecutionPlugin {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReOptimizePlugin.class);
+
+  private boolean retryPossible;
+
+  private Driver coreDriver;
+
+  private OperatorStatsReaderHook statsReaderHook;
+
+  class LocalHook implements ExecuteWithHookContext {
+
+    @Override
+    public void run(HookContext hookContext) throws Exception {
+      if (hookContext.getHookType() == HookType.ON_FAILURE_HOOK) {
+        Throwable exception = hookContext.getException();
+        if (exception != null) {
+          {
+            String message = exception.getMessage();
+            if (message != null) {
+              boolean isOOM = 
message.contains(MapJoinMemoryExhaustionError.class.getName())
+                  || message.contains(OutOfMemoryError.class.getName());
+              if (message.contains("Vertex failed,") && isOOM) {
+                retryPossible = true;
+              }
+              System.out.println(exception);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void initialize(Driver driver) {
+    coreDriver = driver;
+    coreDriver.getHookRunner().addOnFailureHook(new LocalHook());
+    statsReaderHook = new OperatorStatsReaderHook();
+    coreDriver.getHookRunner().addOnFailureHook(statsReaderHook);
+    coreDriver.getHookRunner().addPostHook(statsReaderHook);
+    //    statsReaderHook.setCollectOnSuccess(true);
+    statsReaderHook.setCollectOnSuccess(
+        
driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS));
+  }
+
+  @Override
+  public boolean shouldReExecute(int executionNum) {
+    return retryPossible;
+  }
+
+  @Override
+  public void prepareToReExecute() {
+    statsReaderHook.setCollectOnSuccess(true);
+    PlanMapper pm = coreDriver.getContext().getPlanMapper();
+    coreDriver.setRuntimeStatsSource(new SimpleRuntimeStatsSource(pm));
+    retryPossible = false;
+  }
+
+  @Override
+  public boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, 
PlanMapper newPlanMapper) {
+    return planDidChange(oldPlanMapper, newPlanMapper);
+  }
+
+  private boolean planDidChange(PlanMapper pmL, PlanMapper pmR) {
+    List<Operator> opsL = getRootOps(pmL);
+    List<Operator> opsR = getRootOps(pmR);
+    for (Iterator<Operator> itL = opsL.iterator(); itL.hasNext();) {
+      Operator<?> opL = itL.next();
+      for (Iterator<Operator> itR = opsR.iterator(); itR.hasNext();) {
+        Operator<?> opR = itR.next();
+        if (opL.logicalEqualsTree(opR)) {
+          itL.remove();
+          itR.remove();
+          break;
+        }
+      }
+    }
+    return opsL.isEmpty() && opsR.isEmpty();
+  }
+
+  private List<Operator> getRootOps(PlanMapper pmL) {
+    List<Operator> ops = pmL.getAll(Operator.class);
+    for (Iterator<Operator> iterator = ops.iterator(); iterator.hasNext();) {
+      Operator o = iterator.next();
+      if (o.getNumChild() != 0) {
+        iterator.remove();
+      }
+    }
+    return ops;
+  }
+
+  @Override
+  public void beforeExecute(int executionIndex, boolean explainReOptimization) 
{
+    if (explainReOptimization) {
+      statsReaderHook.setCollectOnSuccess(true);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java 
b/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java
new file mode 100644
index 0000000..52e18a8
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.stats;
+
+public class OperatorStats {
+  private final String operatorId;
+  private long outputRecords;
+
+  public OperatorStats(final String opId) {
+    this.operatorId = opId;
+    this.outputRecords = -1;
+  }
+
+  public long getOutputRecords() {
+    return outputRecords;
+  }
+
+  public void setOutputRecords(final long outputRecords) {
+    this.outputRecords = outputRecords;
+  }
+
+  public String getOperatorId() {
+    return operatorId;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("OperatorStats %s records: %d", operatorId, 
outputRecords);
+  }
+}

Reply via email to