Repository: hive
Updated Branches:
  refs/heads/master 463a51257 -> 109439c3f


HIVE-20393 : Semijoin Reduction : markSemiJoinForDPP behaves inconsistently 
(Deepak Jaiswal, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/109439c3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/109439c3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/109439c3

Branch: refs/heads/master
Commit: 109439c3fa7219187625d5f45e472d792aee9c82
Parents: 463a512
Author: Deepak Jaiswal <[email protected]>
Authored: Wed Aug 15 15:17:30 2018 -0700
Committer: Deepak Jaiswal <[email protected]>
Committed: Thu Aug 16 00:41:30 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/parse/TezCompiler.java       | 63 +++++++++++---------
 1 file changed, 36 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/109439c3/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index f316f09..fdc9635 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -1555,17 +1555,11 @@ public class TezCompiler extends TaskCompiler {
   private void markSemiJoinForDPP(OptimizeTezProcContext procCtx)
           throws SemanticException {
     // Stores the Tablescan operators processed to avoid redoing them.
-    Map<TableScanOperator, TableScanOperator> tsOps = new HashMap<>();
     Map<ReduceSinkOperator, SemiJoinBranchInfo> map = 
procCtx.parseContext.getRsToSemiJoinBranchInfo();
 
     for (ReduceSinkOperator rs : map.keySet()) {
       SemiJoinBranchInfo sjInfo = map.get(rs);
       TableScanOperator ts = sjInfo.getTsOp();
-      TableScanOperator tsInMap = tsOps.putIfAbsent(ts, ts);
-      if (tsInMap != null) {
-        // Already processed, skip
-        continue;
-      }
 
       if (sjInfo.getIsHint() || !sjInfo.getShouldRemove()) {
         continue;
@@ -1581,25 +1575,28 @@ public class TezCompiler extends TaskCompiler {
                 ((AppMasterEventOperator) op).getConf() instanceof 
DynamicPruningEventDesc) {
           // DPP. Now look up nDVs on both sides to see the selectivity.
           // <Parent Ops>-SEL-GB1-RS1-GB2-RS2
-          SelectOperator selOp = null;
-          try {
-            selOp = (SelectOperator)
-                    (rs.getParentOperators().get(0)
-                            .getParentOperators().get(0)
-                            .getParentOperators().get(0)
-                            .getParentOperators().get(0));
-          } catch (NullPointerException e) {
-            LOG.warn("markSemiJoinForDPP : Null pointer exception caught while 
accessing semijoin operators");
-            assert false;
-            return;
-          }
+          SelectOperator selOp = (SelectOperator)
+                  (rs.getParentOperators().get(0)
+                          .getParentOperators().get(0)
+                          .getParentOperators().get(0)
+                          .getParentOperators().get(0));
+
           try {
-            // If stats are not available, just assume its a useful edge
+            // Get nDVs on Semijoin edge side
             Statistics stats = selOp.getStatistics();
-            ExprNodeColumnDesc colExpr = ExprNodeDescUtils.getColumnExpr(
+            if (stats == null) {
+              // No stats found on semijoin edge, do nothing
+              break;
+            }
+            String selCol = ExprNodeDescUtils.extractColName(
                     selOp.getConf().getColList().get(0));
-            long nDVs = stats.getColumnStatisticsFromColName(
-                    colExpr.getColumn()).getCountDistint();
+            ColStatistics colStatisticsSJ = stats
+                    .getColumnStatisticsFromColName(selCol);
+            if (colStatisticsSJ == null) {
+              // No column stats found for semijoin edge
+              break;
+            }
+            long nDVs = colStatisticsSJ.getCountDistint();
             if (nDVs > 0) {
               // Lookup nDVs on TS side.
               RuntimeValuesInfo rti = procCtx.parseContext
@@ -1607,9 +1604,18 @@ public class TezCompiler extends TaskCompiler {
               ExprNodeDesc tsExpr = rti.getTsColExpr();
               FilterOperator fil = (FilterOperator) 
(ts.getChildOperators().get(0));
               Statistics filStats = fil.getStatistics();
-              ExprNodeColumnDesc tsColExpr = 
ExprNodeDescUtils.getColumnExpr(tsExpr);
-              long nDVsOfTS = filStats.getColumnStatisticsFromColName(
-                      tsColExpr.getColumn()).getCountDistint();
+              if (filStats == null) {
+                // No stats found on target, do nothing
+                break;
+              }
+              String colName = ExprNodeDescUtils.extractColName(tsExpr);
+              ColStatistics colStatisticsTarget = filStats
+                      .getColumnStatisticsFromColName(colName);
+              if (colStatisticsTarget == null) {
+                // No column stats found on target
+                break;
+              }
+              long nDVsOfTS = colStatisticsTarget.getCountDistint();
               double nDVsOfTSFactored = nDVsOfTS * procCtx.conf.getFloatVar(
                       ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_FOR_DPP_FACTOR);
               if ((long)nDVsOfTSFactored > nDVs) {
@@ -1621,11 +1627,14 @@ public class TezCompiler extends TaskCompiler {
               }
             }
           } catch (NullPointerException e) {
-            sjInfo.setShouldRemove(false);
+            // Do nothing
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Caught NPE in markSemiJoinForDPP from ReduceSink " + 
rs + " to TS " + sjInfo.getTsOp());
+            }
           }
           break;
         }
-        if (op instanceof ReduceSinkOperator) {
+        if (op instanceof TerminalOperator) {
           // Done with this branch
           continue;
         }

Reply via email to