Repository: hive
Updated Branches:
  refs/heads/branch-3 e8f283cf1 -> 1d92530a1


HIVE-19824: Improve online datasize estimations for MapJoins (Zoltan Haindrich 
reviewed by Ashutosh Chauhan)

Signed-off-by: Zoltan Haindrich <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1d92530a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1d92530a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1d92530a

Branch: refs/heads/branch-3
Commit: 1d92530a13d08f24b20c7056b0ba1e488a75d633
Parents: e8f283c
Author: Zoltan Haindrich <[email protected]>
Authored: Wed Jun 13 14:38:30 2018 +0200
Committer: Zoltan Haindrich <[email protected]>
Committed: Wed Jun 13 14:38:30 2018 +0200

----------------------------------------------------------------------
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   |  46 +-
 .../clientpositive/bucket_map_join_tez2.q       |   8 +-
 .../test/queries/clientpositive/explainuser_2.q |   6 +-
 .../queries/clientpositive/join_max_hashtable.q |   2 +-
 .../queries/clientpositive/mapjoin_mapjoin.q    |   2 +-
 .../test/queries/clientpositive/tez_smb_main.q  |  10 +-
 .../queries/clientpositive/unionDistinct_1.q    |   3 +-
 .../clientpositive/mapjoin_mapjoin.q.out        | 103 +--
 .../spark/bucket_map_join_tez2.q.out            | 758 ++++++++++---------
 .../clientpositive/spark/mapjoin_mapjoin.q.out  | 114 ++-
 10 files changed, 537 insertions(+), 515 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1d92530a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 4d3d10d..d7b84fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
+import java.math.RoundingMode;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -26,7 +27,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -72,6 +72,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.math.DoubleMath;
 
 /**
  * ConvertJoinMapJoin is an optimization that replaces a common join
@@ -83,7 +85,7 @@ import com.google.common.annotations.VisibleForTesting;
 public class ConvertJoinMapJoin implements NodeProcessor {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ConvertJoinMapJoin.class.getName());
-
+  private float hashTableLoadFactor;
 
   @Override
   /*
@@ -97,6 +99,8 @@ public class ConvertJoinMapJoin implements NodeProcessor {
 
     OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;
 
+    hashTableLoadFactor = 
context.conf.getFloatVar(ConfVars.HIVEHASHTABLELOADFACTOR);
+
     JoinOperator joinOp = (JoinOperator) nd;
     long maxSize = 
context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
     // adjust noconditional task size threshold for LLAP
@@ -222,11 +226,11 @@ public class ConvertJoinMapJoin implements NodeProcessor {
         continue;
       }
       Operator<? extends OperatorDesc> parentOp = 
joinOp.getParentOperators().get(pos);
-      totalSize += parentOp.getStatistics().getDataSize();
+      totalSize += computeOnlineDataSize(parentOp.getStatistics());
     }
 
     // Size of bigtable
-    long bigTableSize = 
joinOp.getParentOperators().get(mapJoinConversionPos).getStatistics().getDataSize();
+    long bigTableSize = 
computeOnlineDataSize(joinOp.getParentOperators().get(mapJoinConversionPos).getStatistics());
 
     // Network cost of DPHJ
     long networkCostDPHJ = totalSize + bigTableSize;
@@ -252,6 +256,27 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     return false;
   }
 
+  private long computeOnlineDataSize(Statistics statistics) {
+    // The datastructure doing the actual storage during mapjoins has some per 
row overhead
+    long onlineDataSize = 0;
+    long memoryOverHeadPerRow = 0;
+    long vLongEstimatedLength = 6; // LazyBinaryUtils.writeVLongToByteArray
+    memoryOverHeadPerRow += vLongEstimatedLength; // offset
+    memoryOverHeadPerRow += vLongEstimatedLength; // length
+
+    long numRows = statistics.getNumRows();
+    if (numRows <= 0) {
+      numRows=1;
+    }
+    long worstCaseNeededSlots = 1L << DoubleMath.log2(numRows / 
hashTableLoadFactor, RoundingMode.UP);
+
+    onlineDataSize += statistics.getDataSize();
+    onlineDataSize += memoryOverHeadPerRow * statistics.getNumRows();
+    onlineDataSize += 8 * worstCaseNeededSlots; // every slot is a long
+
+    return onlineDataSize;
+  }
+
   @VisibleForTesting
   public MemoryMonitorInfo getMemoryMonitorInfo(final long maxSize,
                                                 final HiveConf conf,
@@ -855,11 +880,11 @@ public class ConvertJoinMapJoin implements NodeProcessor {
         return -1;
       }
 
-      long inputSize = currInputStat.getDataSize();
+      long inputSize = computeOnlineDataSize(currInputStat);
 
       boolean currentInputNotFittingInMemory = false;
       if ((bigInputStat == null)
-              || (inputSize > bigInputStat.getDataSize())) {
+          || (inputSize > computeOnlineDataSize(bigInputStat))) {
 
         if (foundInputNotFittingInMemory) {
           // cannot convert to map join; we've already chosen a big table
@@ -899,12 +924,13 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       boolean selectedBigTable = bigTableCandidateSet.contains(pos) &&
               (bigInputStat == null || currentInputNotFittingInMemory ||
                       (!foundInputNotFittingInMemory && 
(currentInputCumulativeCardinality > bigInputCumulativeCardinality ||
-                              (currentInputCumulativeCardinality == 
bigInputCumulativeCardinality && inputSize > bigInputStat.getDataSize()))));
+                  (currentInputCumulativeCardinality == 
bigInputCumulativeCardinality
+                      && inputSize > computeOnlineDataSize(bigInputStat)))));
 
       if (bigInputStat != null && selectedBigTable) {
         // We are replacing the current big table with a new one, thus
         // we need to count the current one as a map table then.
-        totalSize += bigInputStat.getDataSize();
+        totalSize += computeOnlineDataSize(bigInputStat);
         // Check if number of distinct keys is greater than given max number 
of entries
         // for HashMap
         if (checkMapJoinThresholds && 
!checkNumberOfEntriesForHashTable(joinOp, bigTablePosition, context)) {
@@ -1333,7 +1359,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     // Evaluate
     ReduceSinkOperator rsOp = (ReduceSinkOperator) 
joinOp.getParentOperators().get(position);
     Statistics inputStats = rsOp.getStatistics();
-    long inputSize = inputStats.getDataSize();
+    long inputSize = computeOnlineDataSize(inputStats);
     LOG.debug("Estimated size for input {}: {}; Max size for DPHJ conversion: 
{}",
         position, inputSize, max);
     if (inputSize > max) {
@@ -1363,7 +1389,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
         n = StatsUtils.safeMult(n, ndv);
       }
     }
-    final double nn = (double) n;
+    final double nn = n;
     final double a = (nn - 1d) / nn;
     if (a == 1d) {
       // A under-flows if nn is large.

http://git-wip-us.apache.org/repos/asf/hive/blob/1d92530a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q 
b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
index 8c55bb7..345f69a 100644
--- a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
+++ b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
@@ -5,7 +5,7 @@ set hive.mapred.mode=nonstrict;
 set hive.explain.user=false;
 set hive.auto.convert.join=true;
 set hive.auto.convert.join.noconditionaltask=true;
-set hive.auto.convert.join.noconditionaltask.size=10000;
+set hive.auto.convert.join.noconditionaltask.size=30000;
 
 CREATE TABLE srcbucket_mapjoin_n18(key int, value string) partitioned by (ds 
string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
 CREATE TABLE tab_part_n11 (key int, value string) PARTITIONED BY(ds STRING) 
CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
@@ -34,7 +34,7 @@ analyze table srcbucket_mapjoin_part_n20 compute statistics 
for columns;
 analyze table tab_n10 compute statistics for columns;
 analyze table tab_part_n11 compute statistics for columns;
 
-set hive.auto.convert.join.noconditionaltask.size=1500;
+set hive.auto.convert.join.noconditionaltask.size=3500;
 set hive.convert.join.bucket.mapjoin.tez = false;
 explain select a.key, b.key from tab_part_n11 a join tab_part_n11 c on a.key = 
c.key join tab_part_n11 b on a.value = b.value;
 set hive.convert.join.bucket.mapjoin.tez = true;
@@ -56,7 +56,7 @@ explain
 select a.key, a.value, b.value
 from tab1_n5 a join src b on a.key = b.key;
 
-set hive.auto.convert.join.noconditionaltask.size=500;
+set hive.auto.convert.join.noconditionaltask.size=2500;
 set hive.convert.join.bucket.mapjoin.tez = false;
 explain
 select a.key, b.key from (select key from tab_part_n11 where key > 1) a join 
(select key from tab_part_n11 where key > 2) b on a.key = b.key;
@@ -78,7 +78,7 @@ set hive.convert.join.bucket.mapjoin.tez = true;
 explain
 select a.key, b.key from (select key from tab_part_n11 where key > 1) a right 
outer join (select key from tab_part_n11 where key > 2) b on a.key = b.key;
 
-set hive.auto.convert.join.noconditionaltask.size=300;
+set hive.auto.convert.join.noconditionaltask.size=2000;
 set hive.convert.join.bucket.mapjoin.tez = false;
 explain select a.key, b.key from (select distinct key from tab_n10) a join 
tab_n10 b on b.key = a.key;
 set hive.convert.join.bucket.mapjoin.tez = true;

http://git-wip-us.apache.org/repos/asf/hive/blob/1d92530a/ql/src/test/queries/clientpositive/explainuser_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/explainuser_2.q 
b/ql/src/test/queries/clientpositive/explainuser_2.q
index abad080..0d5d635 100644
--- a/ql/src/test/queries/clientpositive/explainuser_2.q
+++ b/ql/src/test/queries/clientpositive/explainuser_2.q
@@ -101,7 +101,7 @@ JOIN (select key, value from src1 union select key, value 
from src union select
 
 set hive.auto.convert.join=true;
 set hive.auto.convert.join.noconditionaltask=true;
-set hive.auto.convert.join.noconditionaltask.size=10000;
+set hive.auto.convert.join.noconditionaltask.size=30000;
 set hive.stats.fetch.column.stats=false;
 
 
@@ -163,7 +163,7 @@ JOIN (select key, value from src1 union select key, value 
from src union select
 
 set hive.auto.convert.join=true;
 set hive.auto.convert.join.noconditionaltask=true;
-set hive.auto.convert.join.noconditionaltask.size=10000;
+set hive.auto.convert.join.noconditionaltask.size=20000;
 set hive.auto.convert.sortmerge.join.bigtable.selection.policy = 
org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;
 
 CREATE TABLE srcbucket_mapjoin_n22(key int, value string) partitioned by (ds 
string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
@@ -195,7 +195,7 @@ select key,value from srcbucket_mapjoin_n22;
 set hive.convert.join.bucket.mapjoin.tez = false;
 set hive.auto.convert.sortmerge.join = true;
 
-set hive.auto.convert.join.noconditionaltask.size=500;
+set hive.auto.convert.join.noconditionaltask.size=2000;
 
 explain 
 select s1.key as key, s1.value as value from tab_n15 s1 join tab_n15 s3 on 
s1.key=s3.key;

http://git-wip-us.apache.org/repos/asf/hive/blob/1d92530a/ql/src/test/queries/clientpositive/join_max_hashtable.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/join_max_hashtable.q 
b/ql/src/test/queries/clientpositive/join_max_hashtable.q
index 8d0ccb7..3c93fb5 100644
--- a/ql/src/test/queries/clientpositive/join_max_hashtable.q
+++ b/ql/src/test/queries/clientpositive/join_max_hashtable.q
@@ -1,7 +1,7 @@
 set hive.auto.convert.join=true;
 set hive.optimize.dynamic.partition.hashjoin=true;
 set hive.auto.convert.join.hashtable.max.entries=500;
-set hive.auto.convert.join.shuffle.max.size=100000;
+set hive.auto.convert.join.shuffle.max.size=200000;
 
 -- CONVERT
 EXPLAIN

http://git-wip-us.apache.org/repos/asf/hive/blob/1d92530a/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q 
b/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q
index ddfa608..f67930a 100644
--- a/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q
+++ b/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q
@@ -2,7 +2,7 @@ set hive.mapred.mode=nonstrict;
 set hive.explain.user=false;
 set hive.auto.convert.join=true;
 set hive.auto.convert.join.noconditionaltask=true;
-set hive.auto.convert.join.noconditionaltask.size=10000;
+set hive.auto.convert.join.noconditionaltask.size=30000;
 set hive.metastore.aggregate.stats.cache.enabled=false;
 set hive.stats.fetch.column.stats=false;
 -- Since the inputs are small, it should be automatically converted to mapjoin

http://git-wip-us.apache.org/repos/asf/hive/blob/1d92530a/ql/src/test/queries/clientpositive/tez_smb_main.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_smb_main.q 
b/ql/src/test/queries/clientpositive/tez_smb_main.q
index f1b66f3..2417902 100644
--- a/ql/src/test/queries/clientpositive/tez_smb_main.q
+++ b/ql/src/test/queries/clientpositive/tez_smb_main.q
@@ -46,7 +46,7 @@ from tab_n11 a join tab_part_n12 b on a.key = b.key;
 select count(*)
 from tab_n11 a join tab_part_n12 b on a.key = b.key;
 
-set hive.auto.convert.join.noconditionaltask.size=2000;
+set hive.auto.convert.join.noconditionaltask.size=4000;
 set hive.mapjoin.hybridgrace.minwbsize=500;
 set hive.mapjoin.hybridgrace.minnumpartitions=4;
 explain
@@ -57,7 +57,7 @@ select count(*)
 from tab_n11 a join tab_part_n12 b on a.key = b.key;
 
 set hive.stats.fetch.column.stats=false;
-set hive.auto.convert.join.noconditionaltask.size=1000;
+set hive.auto.convert.join.noconditionaltask.size=4000;
 set hive.mapjoin.hybridgrace.minwbsize=250;
 set hive.mapjoin.hybridgrace.minnumpartitions=4;
 explain
@@ -68,7 +68,7 @@ select count(*)
 from tab_n11 a join tab_part_n12 b on a.key = b.key;
 
 
-set hive.auto.convert.join.noconditionaltask.size=500;
+set hive.auto.convert.join.noconditionaltask.size=2000;
 set hive.mapjoin.hybridgrace.minwbsize=125;
 set hive.mapjoin.hybridgrace.minnumpartitions=4;
 set hive.llap.memory.oversubscription.max.executors.per.query=0;
@@ -89,7 +89,7 @@ UNION  ALL
 select s2.key as key, s2.value as value from tab_n11 s2
 ) a join tab_part_n12 b on (a.key = b.key);
 
-set hive.auto.convert.join.noconditionaltask.size=10000;
+set hive.auto.convert.join.noconditionaltask.size=20000;
 
 set hive.llap.memory.oversubscription.max.executors.per.query=0;
 explain select count(*) from tab_n11 a join tab_part_n12 b on a.value = 
b.value;
@@ -109,6 +109,8 @@ UNION  ALL
 select s2.key as key, s2.value as value from tab_n11 s2
 ) a join tab_part_n12 b on (a.key = b.key);
 
+set hive.auto.convert.join.noconditionaltask.size=10000;
+
 explain
 select count(*) from
 (select rt1.id from

http://git-wip-us.apache.org/repos/asf/hive/blob/1d92530a/ql/src/test/queries/clientpositive/unionDistinct_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/unionDistinct_1.q 
b/ql/src/test/queries/clientpositive/unionDistinct_1.q
index 34da804..4f6589e 100644
--- a/ql/src/test/queries/clientpositive/unionDistinct_1.q
+++ b/ql/src/test/queries/clientpositive/unionDistinct_1.q
@@ -154,7 +154,7 @@ set hive.merge.mapfiles=false;
 
 set hive.auto.convert.join=true;
 set hive.auto.convert.join.noconditionaltask=true;
-set hive.auto.convert.join.noconditionaltask.size=10000;
+set hive.auto.convert.join.noconditionaltask.size=15000;
 
 -- Since the inputs are small, it should be automatically converted to mapjoin
 
@@ -306,6 +306,7 @@ set hive.stats.fetch.column.stats=false;
 
 -- SORT_QUERY_RESULTS
 
+set hive.auto.convert.join.noconditionaltask.size=20000;
 
 EXPLAIN
 SELECT 

http://git-wip-us.apache.org/repos/asf/hive/blob/1d92530a/ql/src/test/results/clientpositive/mapjoin_mapjoin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/mapjoin_mapjoin.q.out 
b/ql/src/test/results/clientpositive/mapjoin_mapjoin.q.out
index a696961..ed92c17 100644
--- a/ql/src/test/results/clientpositive/mapjoin_mapjoin.q.out
+++ b/ql/src/test/results/clientpositive/mapjoin_mapjoin.q.out
@@ -526,19 +526,20 @@ POSTHOOK: query: explain
 select count(*) from srcpart join src on (srcpart.value=src.value) join src 
src1 on (srcpart.key=src1.key) group by ds
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-9 is a root stage
-  Stage-7 depends on stages: Stage-9
-  Stage-8 depends on stages: Stage-7
+  Stage-8 is a root stage
   Stage-3 depends on stages: Stage-8
   Stage-0 depends on stages: Stage-3
 
 STAGE PLANS:
-  Stage: Stage-9
+  Stage: Stage-8
     Map Reduce Local Work
       Alias -> Map Local Tables:
         $hdt$_1:src1 
           Fetch Operator
             limit: -1
+        $hdt$_2:src 
+          Fetch Operator
+            limit: -1
       Alias -> Map Local Operator Tree:
         $hdt$_1:src1 
           TableScan
@@ -555,8 +556,23 @@ STAGE PLANS:
                   keys:
                     0 _col0 (type: string)
                     1 _col0 (type: string)
+        $hdt$_2:src 
+          TableScan
+            alias: src
+            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE 
Column stats: NONE
+            Filter Operator
+              predicate: value is not null (type: boolean)
+              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE 
Column stats: NONE
+              Select Operator
+                expressions: value (type: string)
+                outputColumnNames: _col0
+                Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
+                HashTable Sink Operator
+                  keys:
+                    0 _col1 (type: string)
+                    1 _col0 (type: string)
 
-  Stage: Stage-7
+  Stage: Stage-3
     Map Reduce
       Map Operator Tree:
           TableScan
@@ -577,63 +593,26 @@ STAGE PLANS:
                     1 _col0 (type: string)
                   outputColumnNames: _col1, _col2
                   Statistics: Num rows: 2200 Data size: 23372 Basic stats: 
COMPLETE Column stats: NONE
-                  File Output Operator
-                    compressed: false
-                    table:
-                        input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                        output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                        serde: 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
-      Execution mode: vectorized
-      Local Work:
-        Map Reduce Local Work
-
-  Stage: Stage-8
-    Map Reduce Local Work
-      Alias -> Map Local Tables:
-        $hdt$_2:src 
-          Fetch Operator
-            limit: -1
-      Alias -> Map Local Operator Tree:
-        $hdt$_2:src 
-          TableScan
-            alias: src
-            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE 
Column stats: NONE
-            Filter Operator
-              predicate: value is not null (type: boolean)
-              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE 
Column stats: NONE
-              Select Operator
-                expressions: value (type: string)
-                outputColumnNames: _col0
-                Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
-                HashTable Sink Operator
-                  keys:
-                    0 _col1 (type: string)
-                    1 _col0 (type: string)
-
-  Stage: Stage-3
-    Map Reduce
-      Map Operator Tree:
-          TableScan
-            Map Join Operator
-              condition map:
-                   Inner Join 0 to 1
-              keys:
-                0 _col1 (type: string)
-                1 _col0 (type: string)
-              outputColumnNames: _col2
-              Statistics: Num rows: 2420 Data size: 25709 Basic stats: 
COMPLETE Column stats: NONE
-              Group By Operator
-                aggregations: count()
-                keys: _col2 (type: string)
-                mode: hash
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 2420 Data size: 25709 Basic stats: 
COMPLETE Column stats: NONE
-                Reduce Output Operator
-                  key expressions: _col0 (type: string)
-                  sort order: +
-                  Map-reduce partition columns: _col0 (type: string)
-                  Statistics: Num rows: 2420 Data size: 25709 Basic stats: 
COMPLETE Column stats: NONE
-                  value expressions: _col1 (type: bigint)
+                  Map Join Operator
+                    condition map:
+                         Inner Join 0 to 1
+                    keys:
+                      0 _col1 (type: string)
+                      1 _col0 (type: string)
+                    outputColumnNames: _col2
+                    Statistics: Num rows: 2420 Data size: 25709 Basic stats: 
COMPLETE Column stats: NONE
+                    Group By Operator
+                      aggregations: count()
+                      keys: _col2 (type: string)
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 2420 Data size: 25709 Basic stats: 
COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 2420 Data size: 25709 Basic 
stats: COMPLETE Column stats: NONE
+                        value expressions: _col1 (type: bigint)
       Execution mode: vectorized
       Local Work:
         Map Reduce Local Work

http://git-wip-us.apache.org/repos/asf/hive/blob/1d92530a/ql/src/test/results/clientpositive/spark/bucket_map_join_tez2.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/spark/bucket_map_join_tez2.q.out 
b/ql/src/test/results/clientpositive/spark/bucket_map_join_tez2.q.out
index 6e5e4e6..73c2729 100644
--- a/ql/src/test/results/clientpositive/spark/bucket_map_join_tez2.q.out
+++ b/ql/src/test/results/clientpositive/spark/bucket_map_join_tez2.q.out
@@ -605,17 +605,16 @@ POSTHOOK: query: explain
 select a.key, b.key from (select key from tab_part_n11 where key > 1) a join 
(select key from tab_part_n11 where key > 2) b on a.key = b.key
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-1 is a root stage
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
-  Stage: Stage-1
+  Stage: Stage-2
     Spark
-      Edges:
-        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 3 (PARTITION-LEVEL 
SORT, 2)
 #### A masked pattern was here ####
       Vertices:
-        Map 1 
+        Map 2 
             Map Operator Tree:
                 TableScan
                   alias: tab_part_n11
@@ -627,13 +626,19 @@ STAGE PLANS:
                       expressions: key (type: int)
                       outputColumnNames: _col0
                       Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
+                      Spark HashTable Sink Operator
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
             Execution mode: vectorized
-        Map 3 
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
             Map Operator Tree:
                 TableScan
                   alias: tab_part_n11
@@ -645,29 +650,26 @@ STAGE PLANS:
                       expressions: key (type: int)
                       outputColumnNames: _col0
                       Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
+                        outputColumnNames: _col0, _col1
+                        input vertices:
+                          1 Map 2
+                        Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 182 Data size: 1939 Basic 
stats: COMPLETE Column stats: NONE
+                          table:
+                              input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                              output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                              serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
             Execution mode: vectorized
-        Reducer 2 
-            Reduce Operator Tree:
-              Join Operator
-                condition map:
-                     Inner Join 0 to 1
-                keys:
-                  0 _col0 (type: int)
-                  1 _col0 (type: int)
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
-                File Output Operator
-                  compressed: false
-                  Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
-                  table:
-                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Local Work:
+              Map Reduce Local Work
 
   Stage: Stage-0
     Fetch Operator
@@ -682,17 +684,16 @@ POSTHOOK: query: explain
 select a.key, b.key from (select key from tab_part_n11 where key > 1) a join 
(select key from tab_part_n11 where key > 2) b on a.key = b.key
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-1 is a root stage
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
-  Stage: Stage-1
+  Stage: Stage-2
     Spark
-      Edges:
-        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 3 (PARTITION-LEVEL 
SORT, 2)
 #### A masked pattern was here ####
       Vertices:
-        Map 1 
+        Map 2 
             Map Operator Tree:
                 TableScan
                   alias: tab_part_n11
@@ -704,13 +705,19 @@ STAGE PLANS:
                       expressions: key (type: int)
                       outputColumnNames: _col0
                       Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
+                      Spark HashTable Sink Operator
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
             Execution mode: vectorized
-        Map 3 
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
             Map Operator Tree:
                 TableScan
                   alias: tab_part_n11
@@ -722,29 +729,26 @@ STAGE PLANS:
                       expressions: key (type: int)
                       outputColumnNames: _col0
                       Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
+                        outputColumnNames: _col0, _col1
+                        input vertices:
+                          1 Map 2
+                        Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 182 Data size: 1939 Basic 
stats: COMPLETE Column stats: NONE
+                          table:
+                              input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                              output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                              serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
             Execution mode: vectorized
-        Reducer 2 
-            Reduce Operator Tree:
-              Join Operator
-                condition map:
-                     Inner Join 0 to 1
-                keys:
-                  0 _col0 (type: int)
-                  1 _col0 (type: int)
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
-                File Output Operator
-                  compressed: false
-                  Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
-                  table:
-                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Local Work:
+              Map Reduce Local Work
 
   Stage: Stage-0
     Fetch Operator
@@ -759,69 +763,71 @@ POSTHOOK: query: explain
 select a.key, b.key from (select key from tab_part_n11 where key > 1) a left 
outer join (select key from tab_part_n11 where key > 2) b on a.key = b.key
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-1 is a root stage
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
-  Stage: Stage-1
+  Stage: Stage-2
     Spark
-      Edges:
-        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 3 (PARTITION-LEVEL 
SORT, 2)
 #### A masked pattern was here ####
       Vertices:
-        Map 1 
+        Map 2 
             Map Operator Tree:
                 TableScan
                   alias: tab_part_n11
                   Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
                   Filter Operator
-                    predicate: (key > 1) (type: boolean)
+                    predicate: (key > 2) (type: boolean)
                     Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
                     Select Operator
                       expressions: key (type: int)
                       outputColumnNames: _col0
                       Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
+                      Spark HashTable Sink Operator
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
             Execution mode: vectorized
-        Map 3 
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
             Map Operator Tree:
                 TableScan
                   alias: tab_part_n11
                   Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
                   Filter Operator
-                    predicate: (key > 2) (type: boolean)
+                    predicate: (key > 1) (type: boolean)
                     Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
                     Select Operator
                       expressions: key (type: int)
                       outputColumnNames: _col0
                       Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
+                      Map Join Operator
+                        condition map:
+                             Left Outer Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
+                        outputColumnNames: _col0, _col1
+                        input vertices:
+                          1 Map 2
+                        Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 182 Data size: 1939 Basic 
stats: COMPLETE Column stats: NONE
+                          table:
+                              input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                              output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                              serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
             Execution mode: vectorized
-        Reducer 2 
-            Reduce Operator Tree:
-              Join Operator
-                condition map:
-                     Left Outer Join 0 to 1
-                keys:
-                  0 _col0 (type: int)
-                  1 _col0 (type: int)
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
-                File Output Operator
-                  compressed: false
-                  Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
-                  table:
-                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Local Work:
+              Map Reduce Local Work
 
   Stage: Stage-0
     Fetch Operator
@@ -836,69 +842,71 @@ POSTHOOK: query: explain
 select a.key, b.key from (select key from tab_part_n11 where key > 1) a left 
outer join (select key from tab_part_n11 where key > 2) b on a.key = b.key
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-1 is a root stage
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
-  Stage: Stage-1
+  Stage: Stage-2
     Spark
-      Edges:
-        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 3 (PARTITION-LEVEL 
SORT, 2)
 #### A masked pattern was here ####
       Vertices:
-        Map 1 
+        Map 2 
             Map Operator Tree:
                 TableScan
                   alias: tab_part_n11
                   Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
                   Filter Operator
-                    predicate: (key > 1) (type: boolean)
+                    predicate: (key > 2) (type: boolean)
                     Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
                     Select Operator
                       expressions: key (type: int)
                       outputColumnNames: _col0
                       Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
+                      Spark HashTable Sink Operator
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
             Execution mode: vectorized
-        Map 3 
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
             Map Operator Tree:
                 TableScan
                   alias: tab_part_n11
                   Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
                   Filter Operator
-                    predicate: (key > 2) (type: boolean)
+                    predicate: (key > 1) (type: boolean)
                     Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
                     Select Operator
                       expressions: key (type: int)
                       outputColumnNames: _col0
                       Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
+                      Map Join Operator
+                        condition map:
+                             Left Outer Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
+                        outputColumnNames: _col0, _col1
+                        input vertices:
+                          1 Map 2
+                        Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 182 Data size: 1939 Basic 
stats: COMPLETE Column stats: NONE
+                          table:
+                              input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                              output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                              serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
             Execution mode: vectorized
-        Reducer 2 
-            Reduce Operator Tree:
-              Join Operator
-                condition map:
-                     Left Outer Join 0 to 1
-                keys:
-                  0 _col0 (type: int)
-                  1 _col0 (type: int)
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
-                File Output Operator
-                  compressed: false
-                  Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
-                  table:
-                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Local Work:
+              Map Reduce Local Work
 
   Stage: Stage-0
     Fetch Operator
@@ -913,14 +921,13 @@ POSTHOOK: query: explain
 select a.key, b.key from (select key from tab_part_n11 where key > 1) a right 
outer join (select key from tab_part_n11 where key > 2) b on a.key = b.key
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-1 is a root stage
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
-  Stage: Stage-1
+  Stage: Stage-2
     Spark
-      Edges:
-        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 3 (PARTITION-LEVEL 
SORT, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -935,13 +942,19 @@ STAGE PLANS:
                       expressions: key (type: int)
                       outputColumnNames: _col0
                       Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
-            Execution mode: vectorized
-        Map 3 
+                      Spark HashTable Sink Operator
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
+            Execution mode: vectorized
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 2 
             Map Operator Tree:
                 TableScan
                   alias: tab_part_n11
@@ -953,29 +966,26 @@ STAGE PLANS:
                       expressions: key (type: int)
                       outputColumnNames: _col0
                       Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
+                      Map Join Operator
+                        condition map:
+                             Right Outer Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
+                        outputColumnNames: _col0, _col1
+                        input vertices:
+                          0 Map 1
+                        Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 182 Data size: 1939 Basic 
stats: COMPLETE Column stats: NONE
+                          table:
+                              input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                              output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                              serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
             Execution mode: vectorized
-        Reducer 2 
-            Reduce Operator Tree:
-              Join Operator
-                condition map:
-                     Right Outer Join 0 to 1
-                keys:
-                  0 _col0 (type: int)
-                  1 _col0 (type: int)
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
-                File Output Operator
-                  compressed: false
-                  Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
-                  table:
-                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Local Work:
+              Map Reduce Local Work
 
   Stage: Stage-0
     Fetch Operator
@@ -990,14 +1000,13 @@ POSTHOOK: query: explain
 select a.key, b.key from (select key from tab_part_n11 where key > 1) a right 
outer join (select key from tab_part_n11 where key > 2) b on a.key = b.key
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-1 is a root stage
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
-  Stage: Stage-1
+  Stage: Stage-2
     Spark
-      Edges:
-        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 3 (PARTITION-LEVEL 
SORT, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1012,13 +1021,19 @@ STAGE PLANS:
                       expressions: key (type: int)
                       outputColumnNames: _col0
                       Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
+                      Spark HashTable Sink Operator
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
             Execution mode: vectorized
-        Map 3 
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 2 
             Map Operator Tree:
                 TableScan
                   alias: tab_part_n11
@@ -1030,29 +1045,26 @@ STAGE PLANS:
                       expressions: key (type: int)
                       outputColumnNames: _col0
                       Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 166 Data size: 1763 Basic stats: 
COMPLETE Column stats: NONE
+                      Map Join Operator
+                        condition map:
+                             Right Outer Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
+                        outputColumnNames: _col0, _col1
+                        input vertices:
+                          0 Map 1
+                        Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 182 Data size: 1939 Basic 
stats: COMPLETE Column stats: NONE
+                          table:
+                              input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                              output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                              serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
             Execution mode: vectorized
-        Reducer 2 
-            Reduce Operator Tree:
-              Join Operator
-                condition map:
-                     Right Outer Join 0 to 1
-                keys:
-                  0 _col0 (type: int)
-                  1 _col0 (type: int)
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
-                File Output Operator
-                  compressed: false
-                  Statistics: Num rows: 182 Data size: 1939 Basic stats: 
COMPLETE Column stats: NONE
-                  table:
-                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Local Work:
+              Map Reduce Local Work
 
   Stage: Stage-0
     Fetch Operator
@@ -1065,15 +1077,15 @@ PREHOOK: type: QUERY
 POSTHOOK: query: explain select a.key, b.key from (select distinct key from 
tab_n10) a join tab_n10 b on b.key = a.key
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-1 is a root stage
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
-  Stage: Stage-1
+  Stage: Stage-2
     Spark
       Edges:
         Reducer 2 <- Map 1 (GROUP, 2)
-        Reducer 3 <- Map 4 (PARTITION-LEVEL SORT, 2), Reducer 2 
(PARTITION-LEVEL SORT, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1095,7 +1107,26 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 242 Data size: 2566 Basic stats: 
COMPLETE Column stats: NONE
             Execution mode: vectorized
-        Map 4 
+        Reducer 2 
+            Execution mode: vectorized
+            Local Work:
+              Map Reduce Local Work
+            Reduce Operator Tree:
+              Group By Operator
+                keys: KEY._col0 (type: int)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 121 Data size: 1283 Basic stats: 
COMPLETE Column stats: NONE
+                Spark HashTable Sink Operator
+                  keys:
+                    0 _col0 (type: int)
+                    1 _col0 (type: int)
+
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 3 
             Map Operator Tree:
                 TableScan
                   alias: b
@@ -1107,42 +1138,26 @@ STAGE PLANS:
                       expressions: key (type: int)
                       outputColumnNames: _col0
                       Statistics: Num rows: 242 Data size: 2566 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 242 Data size: 2566 Basic stats: 
COMPLETE Column stats: NONE
-            Execution mode: vectorized
-        Reducer 2 
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
+                        outputColumnNames: _col0, _col1
+                        input vertices:
+                          0 Reducer 2
+                        Statistics: Num rows: 266 Data size: 2822 Basic stats: 
COMPLETE Column stats: NONE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 266 Data size: 2822 Basic 
stats: COMPLETE Column stats: NONE
+                          table:
+                              input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                              output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                              serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
             Execution mode: vectorized
-            Reduce Operator Tree:
-              Group By Operator
-                keys: KEY._col0 (type: int)
-                mode: mergepartial
-                outputColumnNames: _col0
-                Statistics: Num rows: 121 Data size: 1283 Basic stats: 
COMPLETE Column stats: NONE
-                Reduce Output Operator
-                  key expressions: _col0 (type: int)
-                  sort order: +
-                  Map-reduce partition columns: _col0 (type: int)
-                  Statistics: Num rows: 121 Data size: 1283 Basic stats: 
COMPLETE Column stats: NONE
-        Reducer 3 
-            Reduce Operator Tree:
-              Join Operator
-                condition map:
-                     Inner Join 0 to 1
-                keys:
-                  0 _col0 (type: int)
-                  1 _col0 (type: int)
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 266 Data size: 2822 Basic stats: 
COMPLETE Column stats: NONE
-                File Output Operator
-                  compressed: false
-                  Statistics: Num rows: 266 Data size: 2822 Basic stats: 
COMPLETE Column stats: NONE
-                  table:
-                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Local Work:
+              Map Reduce Local Work
 
   Stage: Stage-0
     Fetch Operator
@@ -1155,15 +1170,15 @@ PREHOOK: type: QUERY
 POSTHOOK: query: explain select a.key, b.key from (select distinct key from 
tab_n10) a join tab_n10 b on b.key = a.key
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-1 is a root stage
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
-  Stage: Stage-1
+  Stage: Stage-2
     Spark
       Edges:
         Reducer 2 <- Map 1 (GROUP, 2)
-        Reducer 3 <- Map 4 (PARTITION-LEVEL SORT, 2), Reducer 2 
(PARTITION-LEVEL SORT, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1185,7 +1200,26 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 242 Data size: 2566 Basic stats: 
COMPLETE Column stats: NONE
             Execution mode: vectorized
-        Map 4 
+        Reducer 2 
+            Execution mode: vectorized
+            Local Work:
+              Map Reduce Local Work
+            Reduce Operator Tree:
+              Group By Operator
+                keys: KEY._col0 (type: int)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 121 Data size: 1283 Basic stats: 
COMPLETE Column stats: NONE
+                Spark HashTable Sink Operator
+                  keys:
+                    0 _col0 (type: int)
+                    1 _col0 (type: int)
+
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 3 
             Map Operator Tree:
                 TableScan
                   alias: b
@@ -1197,42 +1231,26 @@ STAGE PLANS:
                       expressions: key (type: int)
                       outputColumnNames: _col0
                       Statistics: Num rows: 242 Data size: 2566 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 242 Data size: 2566 Basic stats: 
COMPLETE Column stats: NONE
-            Execution mode: vectorized
-        Reducer 2 
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
+                        outputColumnNames: _col0, _col1
+                        input vertices:
+                          0 Reducer 2
+                        Statistics: Num rows: 266 Data size: 2822 Basic stats: 
COMPLETE Column stats: NONE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 266 Data size: 2822 Basic 
stats: COMPLETE Column stats: NONE
+                          table:
+                              input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                              output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                              serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
             Execution mode: vectorized
-            Reduce Operator Tree:
-              Group By Operator
-                keys: KEY._col0 (type: int)
-                mode: mergepartial
-                outputColumnNames: _col0
-                Statistics: Num rows: 121 Data size: 1283 Basic stats: 
COMPLETE Column stats: NONE
-                Reduce Output Operator
-                  key expressions: _col0 (type: int)
-                  sort order: +
-                  Map-reduce partition columns: _col0 (type: int)
-                  Statistics: Num rows: 121 Data size: 1283 Basic stats: 
COMPLETE Column stats: NONE
-        Reducer 3 
-            Reduce Operator Tree:
-              Join Operator
-                condition map:
-                     Inner Join 0 to 1
-                keys:
-                  0 _col0 (type: int)
-                  1 _col0 (type: int)
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 266 Data size: 2822 Basic stats: 
COMPLETE Column stats: NONE
-                File Output Operator
-                  compressed: false
-                  Statistics: Num rows: 266 Data size: 2822 Basic stats: 
COMPLETE Column stats: NONE
-                  table:
-                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Local Work:
+              Map Reduce Local Work
 
   Stage: Stage-0
     Fetch Operator
@@ -1245,15 +1263,15 @@ PREHOOK: type: QUERY
 POSTHOOK: query: explain select a.value, b.value from (select distinct value 
from tab_n10) a join tab_n10 b on b.key = a.value
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-1 is a root stage
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
-  Stage: Stage-1
+  Stage: Stage-2
     Spark
       Edges:
         Reducer 2 <- Map 1 (GROUP, 2)
-        Reducer 3 <- Map 4 (PARTITION-LEVEL SORT, 2), Reducer 2 
(PARTITION-LEVEL SORT, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1275,7 +1293,26 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: string)
                         Statistics: Num rows: 242 Data size: 2566 Basic stats: 
COMPLETE Column stats: NONE
             Execution mode: vectorized
-        Map 4 
+        Reducer 2 
+            Execution mode: vectorized
+            Local Work:
+              Map Reduce Local Work
+            Reduce Operator Tree:
+              Group By Operator
+                keys: KEY._col0 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 121 Data size: 1283 Basic stats: 
COMPLETE Column stats: NONE
+                Spark HashTable Sink Operator
+                  keys:
+                    0 UDFToDouble(_col0) (type: double)
+                    1 UDFToDouble(_col0) (type: double)
+
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 3 
             Map Operator Tree:
                 TableScan
                   alias: b
@@ -1287,48 +1324,30 @@ STAGE PLANS:
                       expressions: key (type: int), value (type: string)
                       outputColumnNames: _col0, _col1
                       Statistics: Num rows: 242 Data size: 2566 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: UDFToDouble(_col0) (type: double)
-                        sort order: +
-                        Map-reduce partition columns: UDFToDouble(_col0) 
(type: double)
-                        Statistics: Num rows: 242 Data size: 2566 Basic stats: 
COMPLETE Column stats: NONE
-                        value expressions: _col1 (type: string)
-            Execution mode: vectorized
-        Reducer 2 
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 UDFToDouble(_col0) (type: double)
+                          1 UDFToDouble(_col0) (type: double)
+                        outputColumnNames: _col0, _col2
+                        input vertices:
+                          0 Reducer 2
+                        Statistics: Num rows: 266 Data size: 2822 Basic stats: 
COMPLETE Column stats: NONE
+                        Select Operator
+                          expressions: _col0 (type: string), _col2 (type: 
string)
+                          outputColumnNames: _col0, _col1
+                          Statistics: Num rows: 266 Data size: 2822 Basic 
stats: COMPLETE Column stats: NONE
+                          File Output Operator
+                            compressed: false
+                            Statistics: Num rows: 266 Data size: 2822 Basic 
stats: COMPLETE Column stats: NONE
+                            table:
+                                input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                                output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                                serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
             Execution mode: vectorized
-            Reduce Operator Tree:
-              Group By Operator
-                keys: KEY._col0 (type: string)
-                mode: mergepartial
-                outputColumnNames: _col0
-                Statistics: Num rows: 121 Data size: 1283 Basic stats: 
COMPLETE Column stats: NONE
-                Reduce Output Operator
-                  key expressions: UDFToDouble(_col0) (type: double)
-                  sort order: +
-                  Map-reduce partition columns: UDFToDouble(_col0) (type: 
double)
-                  Statistics: Num rows: 121 Data size: 1283 Basic stats: 
COMPLETE Column stats: NONE
-                  value expressions: _col0 (type: string)
-        Reducer 3 
-            Reduce Operator Tree:
-              Join Operator
-                condition map:
-                     Inner Join 0 to 1
-                keys:
-                  0 UDFToDouble(_col0) (type: double)
-                  1 UDFToDouble(_col0) (type: double)
-                outputColumnNames: _col0, _col2
-                Statistics: Num rows: 266 Data size: 2822 Basic stats: 
COMPLETE Column stats: NONE
-                Select Operator
-                  expressions: _col0 (type: string), _col2 (type: string)
-                  outputColumnNames: _col0, _col1
-                  Statistics: Num rows: 266 Data size: 2822 Basic stats: 
COMPLETE Column stats: NONE
-                  File Output Operator
-                    compressed: false
-                    Statistics: Num rows: 266 Data size: 2822 Basic stats: 
COMPLETE Column stats: NONE
-                    table:
-                        input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                        output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                        serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Local Work:
+              Map Reduce Local Work
 
   Stage: Stage-0
     Fetch Operator
@@ -1341,15 +1360,15 @@ PREHOOK: type: QUERY
 POSTHOOK: query: explain select a.value, b.value from (select distinct value 
from tab_n10) a join tab_n10 b on b.key = a.value
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-1 is a root stage
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
-  Stage: Stage-1
+  Stage: Stage-2
     Spark
       Edges:
         Reducer 2 <- Map 1 (GROUP, 2)
-        Reducer 3 <- Map 4 (PARTITION-LEVEL SORT, 2), Reducer 2 
(PARTITION-LEVEL SORT, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1371,7 +1390,26 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: string)
                         Statistics: Num rows: 242 Data size: 2566 Basic stats: 
COMPLETE Column stats: NONE
             Execution mode: vectorized
-        Map 4 
+        Reducer 2 
+            Execution mode: vectorized
+            Local Work:
+              Map Reduce Local Work
+            Reduce Operator Tree:
+              Group By Operator
+                keys: KEY._col0 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 121 Data size: 1283 Basic stats: 
COMPLETE Column stats: NONE
+                Spark HashTable Sink Operator
+                  keys:
+                    0 UDFToDouble(_col0) (type: double)
+                    1 UDFToDouble(_col0) (type: double)
+
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 3 
             Map Operator Tree:
                 TableScan
                   alias: b
@@ -1383,48 +1421,30 @@ STAGE PLANS:
                       expressions: key (type: int), value (type: string)
                       outputColumnNames: _col0, _col1
                       Statistics: Num rows: 242 Data size: 2566 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: UDFToDouble(_col0) (type: double)
-                        sort order: +
-                        Map-reduce partition columns: UDFToDouble(_col0) 
(type: double)
-                        Statistics: Num rows: 242 Data size: 2566 Basic stats: 
COMPLETE Column stats: NONE
-                        value expressions: _col1 (type: string)
-            Execution mode: vectorized
-        Reducer 2 
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 UDFToDouble(_col0) (type: double)
+                          1 UDFToDouble(_col0) (type: double)
+                        outputColumnNames: _col0, _col2
+                        input vertices:
+                          0 Reducer 2
+                        Statistics: Num rows: 266 Data size: 2822 Basic stats: 
COMPLETE Column stats: NONE
+                        Select Operator
+                          expressions: _col0 (type: string), _col2 (type: 
string)
+                          outputColumnNames: _col0, _col1
+                          Statistics: Num rows: 266 Data size: 2822 Basic 
stats: COMPLETE Column stats: NONE
+                          File Output Operator
+                            compressed: false
+                            Statistics: Num rows: 266 Data size: 2822 Basic 
stats: COMPLETE Column stats: NONE
+                            table:
+                                input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                                output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                                serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
             Execution mode: vectorized
-            Reduce Operator Tree:
-              Group By Operator
-                keys: KEY._col0 (type: string)
-                mode: mergepartial
-                outputColumnNames: _col0
-                Statistics: Num rows: 121 Data size: 1283 Basic stats: 
COMPLETE Column stats: NONE
-                Reduce Output Operator
-                  key expressions: UDFToDouble(_col0) (type: double)
-                  sort order: +
-                  Map-reduce partition columns: UDFToDouble(_col0) (type: 
double)
-                  Statistics: Num rows: 121 Data size: 1283 Basic stats: 
COMPLETE Column stats: NONE
-                  value expressions: _col0 (type: string)
-        Reducer 3 
-            Reduce Operator Tree:
-              Join Operator
-                condition map:
-                     Inner Join 0 to 1
-                keys:
-                  0 UDFToDouble(_col0) (type: double)
-                  1 UDFToDouble(_col0) (type: double)
-                outputColumnNames: _col0, _col2
-                Statistics: Num rows: 266 Data size: 2822 Basic stats: 
COMPLETE Column stats: NONE
-                Select Operator
-                  expressions: _col0 (type: string), _col2 (type: string)
-                  outputColumnNames: _col0, _col1
-                  Statistics: Num rows: 266 Data size: 2822 Basic stats: 
COMPLETE Column stats: NONE
-                  File Output Operator
-                    compressed: false
-                    Statistics: Num rows: 266 Data size: 2822 Basic stats: 
COMPLETE Column stats: NONE
-                    table:
-                        input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                        output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                        serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Local Work:
+              Map Reduce Local Work
 
   Stage: Stage-0
     Fetch Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/1d92530a/ql/src/test/results/clientpositive/spark/mapjoin_mapjoin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/mapjoin_mapjoin.q.out 
b/ql/src/test/results/clientpositive/spark/mapjoin_mapjoin.q.out
index fff2f31..db16a46 100644
--- a/ql/src/test/results/clientpositive/spark/mapjoin_mapjoin.q.out
+++ b/ql/src/test/results/clientpositive/spark/mapjoin_mapjoin.q.out
@@ -563,7 +563,26 @@ STAGE PLANS:
     Spark
 #### A masked pattern was here ####
       Vertices:
-        Map 5 
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: src1
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
+                      Spark HashTable Sink Operator
+                        keys:
+                          0 _col0 (type: string)
+                          1 _col0 (type: string)
+            Execution mode: vectorized
+            Local Work:
+              Map Reduce Local Work
+        Map 4 
             Map Operator Tree:
                 TableScan
                   alias: src
@@ -586,8 +605,7 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL 
SORT, 2)
-        Reducer 3 <- Reducer 2 (GROUP, 2)
+        Reducer 2 <- Map 1 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -602,66 +620,42 @@ STAGE PLANS:
                       expressions: key (type: string), value (type: string), 
ds (type: string)
                       outputColumnNames: _col0, _col1, _col2
                       Statistics: Num rows: 2000 Data size: 21248 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: string)
-                        Statistics: Num rows: 2000 Data size: 21248 Basic 
stats: COMPLETE Column stats: NONE
-                        value expressions: _col1 (type: string), _col2 (type: 
string)
-            Execution mode: vectorized
-        Map 4 
-            Map Operator Tree:
-                TableScan
-                  alias: src1
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
-                  Filter Operator
-                    predicate: key is not null (type: boolean)
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
-                    Select Operator
-                      expressions: key (type: string)
-                      outputColumnNames: _col0
-                      Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: string)
-                        Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: string)
+                          1 _col0 (type: string)
+                        outputColumnNames: _col1, _col2
+                        input vertices:
+                          1 Map 3
+                        Statistics: Num rows: 2200 Data size: 23372 Basic 
stats: COMPLETE Column stats: NONE
+                        Map Join Operator
+                          condition map:
+                               Inner Join 0 to 1
+                          keys:
+                            0 _col1 (type: string)
+                            1 _col0 (type: string)
+                          outputColumnNames: _col2
+                          input vertices:
+                            1 Map 4
+                          Statistics: Num rows: 2420 Data size: 25709 Basic 
stats: COMPLETE Column stats: NONE
+                          Group By Operator
+                            aggregations: count()
+                            keys: _col2 (type: string)
+                            mode: hash
+                            outputColumnNames: _col0, _col1
+                            Statistics: Num rows: 2420 Data size: 25709 Basic 
stats: COMPLETE Column stats: NONE
+                            Reduce Output Operator
+                              key expressions: _col0 (type: string)
+                              sort order: +
+                              Map-reduce partition columns: _col0 (type: 
string)
+                              Statistics: Num rows: 2420 Data size: 25709 
Basic stats: COMPLETE Column stats: NONE
+                              value expressions: _col1 (type: bigint)
             Execution mode: vectorized
-        Reducer 2 
             Local Work:
               Map Reduce Local Work
-            Reduce Operator Tree:
-              Join Operator
-                condition map:
-                     Inner Join 0 to 1
-                keys:
-                  0 _col0 (type: string)
-                  1 _col0 (type: string)
-                outputColumnNames: _col1, _col2
-                Statistics: Num rows: 2200 Data size: 23372 Basic stats: 
COMPLETE Column stats: NONE
-                Map Join Operator
-                  condition map:
-                       Inner Join 0 to 1
-                  keys:
-                    0 _col1 (type: string)
-                    1 _col0 (type: string)
-                  outputColumnNames: _col2
-                  input vertices:
-                    1 Map 5
-                  Statistics: Num rows: 2420 Data size: 25709 Basic stats: 
COMPLETE Column stats: NONE
-                  Group By Operator
-                    aggregations: count()
-                    keys: _col2 (type: string)
-                    mode: hash
-                    outputColumnNames: _col0, _col1
-                    Statistics: Num rows: 2420 Data size: 25709 Basic stats: 
COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: _col0 (type: string)
-                      sort order: +
-                      Map-reduce partition columns: _col0 (type: string)
-                      Statistics: Num rows: 2420 Data size: 25709 Basic stats: 
COMPLETE Column stats: NONE
-                      value expressions: _col1 (type: bigint)
-        Reducer 3 
+        Reducer 2 
             Execution mode: vectorized
             Reduce Operator Tree:
               Group By Operator

Reply via email to