This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 964f08a  HIVE-22538: RS deduplication does not always enforce 
hive.optimize.reducededuplication.min.reducer (Krisztian Kasa, reviewed by 
Jesus Camacho Rodriguez)
964f08a is described below

commit 964f08ae733b037c6e58dfb4ed149ccad2d3ddc0
Author: Krisztian Kasa <[email protected]>
AuthorDate: Tue Jan 28 12:59:01 2020 -0800

    HIVE-22538: RS deduplication does not always enforce 
hive.optimize.reducededuplication.min.reducer (Krisztian Kasa, reviewed by 
Jesus Camacho Rodriguez)
    
    Close apache/hive#877
    Close apache/hive#855
---
 .../org/apache/hadoop/hive/ql/TestAcidOnTez.java   |   3 +-
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java    |   6 +-
 .../apache/hadoop/hive/ql/optimizer/Optimizer.java |   2 +-
 .../correlation/AbstractCorrelationProcCtx.java    |  21 +--
 .../apache/hadoop/hive/ql/parse/ParseContext.java  |  17 +--
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     |  14 +-
 .../apache/hadoop/hive/ql/parse/TaskCompiler.java  |   8 +-
 .../apache/hadoop/hive/ql/parse/TezCompiler.java   |   3 +-
 .../org/apache/hadoop/hive/ql/plan/PlanUtils.java  |  20 ++-
 ql/src/test/queries/clientpositive/clusterctas.q   |  12 ++
 .../test/results/clientpositive/clusterctas.q.out  | 142 ++++++++++++++++++++
 .../clientpositive/llap/check_constraint.q.out     |  47 ++++---
 .../results/clientpositive/llap/clusterctas.q.out  | 145 +++++++++++++++++++++
 .../llap/enforce_constraint_notnull.q.out          |  51 +++++---
 .../llap/materialized_view_create_rewrite_4.q.out  | 137 +++++++++++--------
 .../llap/materialized_view_rewrite_window.q.out    |  82 +++++++-----
 .../clientpositive/llap/semijoin_reddedup.q.out    | 129 ++++++++++--------
 17 files changed, 604 insertions(+), 235 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index 2868427..056cd27 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -699,7 +699,8 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree  
~/dev/hiverwgit/itests/h
     setupTez(confForTez);
     int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}};
     runStatementOnDriver("delete from " + Table.ACIDTBL, confForTez);
-    runStatementOnDriver("insert into " + Table.ACIDTBL + 
TestTxnCommands2.makeValuesClause(values));//make sure both buckets are not 
empty
+    //make sure both buckets are not empty
+    runStatementOnDriver("insert into " + Table.ACIDTBL + 
TestTxnCommands2.makeValuesClause(values), confForTez);
     runStatementOnDriver("drop table if exists T", confForTez);
     /*
     With bucketed target table Union All is not removed
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 3fa61d3..398698e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -576,11 +576,7 @@ public class OrcRecordUpdater implements RecordUpdater {
           if (options.isWritingBase()) {
             // With insert overwrite we need the empty file to delete the 
previous content of the table
             LOG.debug("Empty file has been created for overwrite: {}", path);
-
-            OrcFile.WriterOptions wo = 
OrcFile.writerOptions(this.options.getConfiguration())
-                .inspector(rowInspector)
-                .callback(new OrcRecordUpdater.KeyIndexBuilder("testEmpty"));
-            OrcFile.createWriter(path, wo).close();
+            OrcFile.createWriter(path, writerOptions).close();
           } else {
             LOG.debug("No insert events in path: {}.. Deleting..", path);
             fs.delete(path, false);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
index 25e9cd0..da277d0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
@@ -191,7 +191,7 @@ public class Optimizer {
       transformations.add(new FixedBucketPruningOptimizer(compatMode));
     }
 
-    if(HiveConf.getBoolVar(hiveConf, 
HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION) || pctx.hasAcidWrite()) {
+    if(HiveConf.getBoolVar(hiveConf, 
HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) {
       transformations.add(new ReduceSinkDeDuplication());
     }
     transformations.add(new NonBlockingOpDeDupProc());
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java
index 4e72c4c..4208abe 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java
@@ -28,12 +28,8 @@ import java.util.Set;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 abstract class AbstractCorrelationProcCtx implements NodeProcessorCtx {
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractCorrelationProcCtx.class);
   private ParseContext pctx;
   // For queries using script, the optimization cannot be applied without 
user's confirmation
   // If script preserves alias and value for columns related to keys, user can 
set this true
@@ -49,22 +45,7 @@ abstract class AbstractCorrelationProcCtx implements 
NodeProcessorCtx {
   public AbstractCorrelationProcCtx(ParseContext pctx) {
     removedOps = new HashSet<Operator<?>>();
     trustScript = pctx.getConf().getBoolVar(HIVESCRIPTOPERATORTRUST);
-    if(pctx.hasAcidWrite()) {
-      StringBuilder tblNames = new StringBuilder();
-      for(FileSinkDesc fsd : pctx.getAcidSinks()) {
-        if(fsd.getTable() != null) {
-          
tblNames.append(fsd.getTable().getDbName()).append('.').append(fsd.getTable().getTableName()).append(',');
-        }
-      }
-      if(tblNames.length() > 0) {
-        tblNames.setLength(tblNames.length() - 1);//traling ','
-      }
-      LOG.info("Overriding " + HIVEOPTREDUCEDEDUPLICATIONMINREDUCER + " to 1 
due to a write to transactional table(s) " + tblNames);
-      minReducer = 1;
-    }
-    else {
-      minReducer = 
pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER);
-    }
+    minReducer = 
pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER);
     isMapAggr = pctx.getConf().getBoolVar(HIVEMAPSIDEAGGREGATE);
     this.pctx = pctx;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
index 91bdbfd..bef0217 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
@@ -46,17 +46,14 @@ import 
org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
 import 
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc.SampleDesc;
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -128,7 +125,6 @@ public class ParseContext {
   private Map<SelectOperator, Table> viewProjectToViewSchema;
   private ColumnAccessInfo columnAccessInfo;
   private boolean needViewColumnAuthorization;
-  private Set<FileSinkDesc> acidFileSinks = Collections.emptySet();
 
   private Map<ReduceSinkOperator, RuntimeValuesInfo> rsToRuntimeValuesInfo =
           new LinkedHashMap<ReduceSinkOperator, RuntimeValuesInfo>();
@@ -199,7 +195,7 @@ public class ParseContext {
       AnalyzeRewriteContext analyzeRewrite, CreateTableDesc createTableDesc,
       CreateViewDesc createViewDesc, MaterializedViewUpdateDesc 
materializedViewUpdateDesc,
       QueryProperties queryProperties,
-      Map<SelectOperator, Table> viewProjectToTableSchema, Set<FileSinkDesc> 
acidFileSinks) {
+      Map<SelectOperator, Table> viewProjectToTableSchema) {
     this.queryState = queryState;
     this.conf = queryState.getConf();
     this.opToPartPruner = opToPartPruner;
@@ -239,17 +235,8 @@ public class ParseContext {
       // authorization info.
       this.columnAccessInfo = new ColumnAccessInfo();
     }
-    if(acidFileSinks != null && !acidFileSinks.isEmpty()) {
-      this.acidFileSinks = new HashSet<>();
-      this.acidFileSinks.addAll(acidFileSinks);
-    }
-  }
-  public Set<FileSinkDesc> getAcidSinks() {
-    return acidFileSinks;
-  }
-  public boolean hasAcidWrite() {
-    return !acidFileSinks.isEmpty();
   }
+
   /**
    * @return the context
    */
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 4afd454..5fcc367 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -559,7 +559,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
         opToPartToSkewedPruner, viewAliasToInput, 
reduceSinkOperatorsAddedByEnforceBucketingSorting,
         analyzeRewrite, tableDesc, createVwDesc, materializedViewUpdateDesc,
-        queryProperties, viewProjectToTableSchema, acidFileSinks);
+        queryProperties, viewProjectToTableSchema);
   }
 
   public CompilationOpContext getOpContext() {
@@ -6853,10 +6853,15 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     }
 
     if (enforceBucketing) {
+      Operation acidOp = AcidUtils.isFullAcidTable(dest_tab) ? 
getAcidType(table_desc.getOutputFileFormatClass(),
+              dest, AcidUtils.isInsertOnlyTable(dest_tab)) : 
Operation.NOT_ACID;
       int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
       if (conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS) > 0) {
         maxReducers = conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
       }
+      if (acidOp == Operation.UPDATE || acidOp == Operation.DELETE) {
+        maxReducers = 1;
+      }
       int numBuckets = dest_tab.getNumBuckets();
       if (numBuckets > maxReducers) {
         LOG.debug("numBuckets is {} and maxReducers is {}", numBuckets, 
maxReducers);
@@ -6871,7 +6876,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
           numFiles = totalFiles / maxReducers;
         }
       }
-      else {
+      else if (acidOp == Operation.NOT_ACID || acidOp == Operation.INSERT) {
         maxReducers = numBuckets;
       }
 
@@ -6883,8 +6888,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       }
       input = genReduceSinkPlan(input, partnCols, sortCols, order.toString(), 
nullOrder.toString(),
           maxReducers,
-          (AcidUtils.isFullAcidTable(dest_tab) ? 
getAcidType(table_desc.getOutputFileFormatClass(),
-              dest, AcidUtils.isInsertOnlyTable(dest_tab)) : 
AcidUtils.Operation.NOT_ACID));
+              acidOp);
       
reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator)input.getParentOperators().get(0));
       ctx.setMultiFileSpray(multiFileSpray);
       ctx.setNumFiles(numFiles);
@@ -12540,7 +12544,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         globalLimitCtx, nameToSplitSample, inputs, rootTasks, 
opToPartToSkewedPruner,
         viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting,
         analyzeRewrite, tableDesc, createVwDesc, materializedViewUpdateDesc,
-        queryProperties, viewProjectToTableSchema, acidFileSinks);
+        queryProperties, viewProjectToTableSchema);
 
     // Set the semijoin hints in parse context
     
pCtx.setSemiJoinHints(parseSemiJoinHint(getQB().getParseInfo().getHintList()));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 1e1d65b..2f3fc6c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -702,15 +702,12 @@ public abstract class TaskCompiler {
         !HiveConf.getBoolVar(hConf, HiveConf.ConfVars.HIVEOPTLISTBUCKETING)) {
       new SortedDynPartitionOptimizer().transform(parseContext);
 
-      if(HiveConf.getBoolVar(hConf, 
HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)
-          || parseContext.hasAcidWrite()) {
-
+      if(HiveConf.getBoolVar(hConf, 
HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) {
         // Dynamic sort partition adds an extra RS therefore need to de-dup
         new ReduceSinkDeDuplication().transform(parseContext);
         // there is an issue with dedup logic wherein SELECT is created with 
wrong columns
         // NonBlockingOpDeDupProc fixes that
         new NonBlockingOpDeDupProc().transform(parseContext);
-
       }
     }
   }
@@ -732,8 +729,7 @@ public abstract class TaskCompiler {
         pCtx.getReduceSinkOperatorsAddedByEnforceBucketingSorting(),
         pCtx.getAnalyzeRewrite(), pCtx.getCreateTable(),
         pCtx.getCreateViewDesc(), pCtx.getMaterializedViewUpdateDesc(),
-        pCtx.getQueryProperties(), pCtx.getViewProjectToTableSchema(),
-        pCtx.getAcidSinks());
+        pCtx.getQueryProperties(), pCtx.getViewProjectToTableSchema());
     clone.setFetchTask(pCtx.getFetchTask());
     clone.setLineageInfo(pCtx.getLineageInfo());
     clone.setMapJoinOps(pCtx.getMapJoinOps());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index ff81543..5a78ed5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -197,8 +197,7 @@ public class TezCompiler extends TaskCompiler {
       perfLogger.PerfLogEnd(this.getClass().getName(), 
PerfLogger.TEZ_COMPILER, "Sorted dynamic partition optimization");
     }
 
-    if(HiveConf.getBoolVar(procCtx.conf, 
HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)
-        || procCtx.parseContext.hasAcidWrite()) {
+    if(HiveConf.getBoolVar(procCtx.conf, 
HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) {
       perfLogger.PerfLogBegin(this.getClass().getName(), 
PerfLogger.TEZ_COMPILER);
       // Dynamic sort partition adds an extra RS therefore need to de-dup
       new ReduceSinkDeDuplication().transform(procCtx.parseContext);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index fb0a422..980f39b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -710,13 +711,18 @@ public final class PlanUtils {
       List<String> outputColumnNames, boolean includeKeyCols, int tag,
       List<ExprNodeDesc> partitionCols, String order, String nullOrder, 
NullOrdering defaultNullOrder,
       int numReducers, AcidUtils.Operation writeType) {
-    return getReduceSinkDesc(keyCols, keyCols.size(), valueCols,
-        new ArrayList<List<Integer>>(),
-        includeKeyCols ? outputColumnNames.subList(0, keyCols.size()) :
-          new ArrayList<String>(),
-        includeKeyCols ? outputColumnNames.subList(keyCols.size(),
-            outputColumnNames.size()) : outputColumnNames,
-        includeKeyCols, tag, partitionCols, order, nullOrder, 
defaultNullOrder, numReducers, writeType);
+    ReduceSinkDesc reduceSinkDesc = getReduceSinkDesc(keyCols, keyCols.size(), 
valueCols,
+            new ArrayList<List<Integer>>(),
+            includeKeyCols ? outputColumnNames.subList(0, keyCols.size()) :
+                    new ArrayList<String>(),
+            includeKeyCols ? outputColumnNames.subList(keyCols.size(),
+                    outputColumnNames.size()) : outputColumnNames,
+            includeKeyCols, tag, partitionCols, order, nullOrder, 
defaultNullOrder, numReducers, writeType);
+    if (writeType == AcidUtils.Operation.UPDATE || writeType == 
AcidUtils.Operation.DELETE) {
+      
reduceSinkDesc.setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.FIXED));
+      reduceSinkDesc.setNumReducers(1);
+    }
+    return reduceSinkDesc;
   }
 
   /**
diff --git a/ql/src/test/queries/clientpositive/clusterctas.q 
b/ql/src/test/queries/clientpositive/clusterctas.q
new file mode 100644
index 0000000..d4e45e0
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/clusterctas.q
@@ -0,0 +1,12 @@
+--! qt:dataset:src
+
+set hive.cbo.enable=false;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+EXPLAIN
+CREATE TABLE x STORED AS ORC TBLPROPERTIES('transactional'='true') AS
+SELECT * FROM SRC x CLUSTER BY x.key;
+CREATE TABLE x STORED AS ORC TBLPROPERTIES('transactional'='true') AS
+SELECT * FROM SRC x CLUSTER BY x.key;
+DROP TABLE x;
diff --git a/ql/src/test/results/clientpositive/clusterctas.q.out 
b/ql/src/test/results/clientpositive/clusterctas.q.out
new file mode 100644
index 0000000..9d76bc5
--- /dev/null
+++ b/ql/src/test/results/clientpositive/clusterctas.q.out
@@ -0,0 +1,142 @@
+PREHOOK: query: EXPLAIN
+CREATE TABLE x STORED AS ORC TBLPROPERTIES('transactional'='true') AS
+SELECT * FROM SRC x CLUSTER BY x.key
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@x
+POSTHOOK: query: EXPLAIN
+CREATE TABLE x STORED AS ORC TBLPROPERTIES('transactional'='true') AS
+SELECT * FROM SRC x CLUSTER BY x.key
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@x
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+  Stage-4 depends on stages: Stage-0, Stage-3
+  Stage-2 depends on stages: Stage-4
+  Stage-3 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: x
+            Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE 
Column stats: COMPLETE
+            Select Operator
+              expressions: key (type: string), value (type: string)
+              outputColumnNames: _col0, _col1
+              Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE 
Column stats: COMPLETE
+              Reduce Output Operator
+                key expressions: _col0 (type: string)
+                null sort order: a
+                sort order: +
+                Map-reduce partition columns: _col0 (type: string)
+                Statistics: Num rows: 500 Data size: 89000 Basic stats: 
COMPLETE Column stats: COMPLETE
+                value expressions: _col1 (type: string)
+      Execution mode: vectorized
+      Reduce Operator Tree:
+        Select Operator
+          expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: 
string)
+          outputColumnNames: _col0, _col1
+          Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE 
Column stats: COMPLETE
+          File Output Operator
+            compressed: false
+            Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE 
Column stats: COMPLETE
+            table:
+                input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                name: default.x
+            Write Type: INSERT
+          Select Operator
+            expressions: _col0 (type: string), _col1 (type: string)
+            outputColumnNames: col1, col2
+            Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE 
Column stats: COMPLETE
+            Group By Operator
+              aggregations: compute_stats(col1, 'hll'), compute_stats(col2, 
'hll')
+              minReductionHashAggr: 0.99
+              mode: hash
+              outputColumnNames: _col0, _col1
+              Statistics: Num rows: 1 Data size: 880 Basic stats: COMPLETE 
Column stats: COMPLETE
+              File Output Operator
+                compressed: false
+                table:
+                    input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                    output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                    serde: 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+  Stage: Stage-0
+    Move Operator
+      files:
+          hdfs directory: true
+#### A masked pattern was here ####
+          Write Type: INSERT
+
+  Stage: Stage-4
+    Create Table
+      columns: key string, value string
+      name: default.x
+      input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+      output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+      serde name: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+      table properties:
+        transactional true
+
+  Stage: Stage-2
+    Stats Work
+      Basic Stats Work:
+      Column Stats Desc:
+          Columns: key, value
+          Column Types: string, string
+          Table: default.x
+
+  Stage: Stage-3
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            Reduce Output Operator
+              null sort order: 
+              sort order: 
+              Statistics: Num rows: 1 Data size: 880 Basic stats: COMPLETE 
Column stats: COMPLETE
+              value expressions: _col0 (type: 
struct<columntype:string,maxlength:bigint,sumlength:bigint,count:bigint,countnulls:bigint,bitvector:binary>),
 _col1 (type: 
struct<columntype:string,maxlength:bigint,sumlength:bigint,count:bigint,countnulls:bigint,bitvector:binary>)
+      Execution mode: vectorized
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations: compute_stats(VALUE._col0), compute_stats(VALUE._col1)
+          mode: mergepartial
+          outputColumnNames: _col0, _col1
+          Statistics: Num rows: 1 Data size: 880 Basic stats: COMPLETE Column 
stats: COMPLETE
+          File Output Operator
+            compressed: false
+            Statistics: Num rows: 1 Data size: 880 Basic stats: COMPLETE 
Column stats: COMPLETE
+            table:
+                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+PREHOOK: query: CREATE TABLE x STORED AS ORC 
TBLPROPERTIES('transactional'='true') AS
+SELECT * FROM SRC x CLUSTER BY x.key
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@x
+POSTHOOK: query: CREATE TABLE x STORED AS ORC 
TBLPROPERTIES('transactional'='true') AS
+SELECT * FROM SRC x CLUSTER BY x.key
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@x
+POSTHOOK: Lineage: x.key SIMPLE [(src)x.FieldSchema(name:key, type:string, 
comment:default), ]
+POSTHOOK: Lineage: x.value SIMPLE [(src)x.FieldSchema(name:value, type:string, 
comment:default), ]
+PREHOOK: query: DROP TABLE x
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@x
+PREHOOK: Output: default@x
+POSTHOOK: query: DROP TABLE x
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@x
+POSTHOOK: Output: default@x
diff --git a/ql/src/test/results/clientpositive/llap/check_constraint.q.out 
b/ql/src/test/results/clientpositive/llap/check_constraint.q.out
index 9f82a10..955a071 100644
--- a/ql/src/test/results/clientpositive/llap/check_constraint.q.out
+++ b/ql/src/test/results/clientpositive/llap/check_constraint.q.out
@@ -1751,6 +1751,7 @@ STAGE PLANS:
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1795,26 +1796,40 @@ STAGE PLANS:
                 outputColumnNames: _col0, _col1, _col2, _col3
                 Statistics: Num rows: 250 Data size: 73500 Basic stats: 
COMPLETE Column stats: COMPLETE
                 Select Operator
-                  expressions: _col2 (type: int), _col3 (type: decimal(5,2)), 
_col1 (type: string)
-                  outputColumnNames: _col0, _col1, _col2
-                  Statistics: Num rows: 250 Data size: 51750 Basic stats: 
COMPLETE Column stats: COMPLETE
-                  Limit
-                    Number of rows: 10
-                    Statistics: Num rows: 10 Data size: 2070 Basic stats: 
COMPLETE Column stats: COMPLETE
-                    Filter Operator
-                      predicate: enforce_constraint((_col1 is not null and 
(_col1 >= CAST( _col0 AS decimal(5,2))) is not false)) (type: boolean)
-                      Statistics: Num rows: 5 Data size: 1035 Basic stats: 
COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        null sort order: a
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 5 Data size: 1035 Basic stats: 
COMPLETE Column stats: COMPLETE
-                        value expressions: _col1 (type: decimal(5,2)), _col2 
(type: string)
+                  expressions: _col2 (type: int), _col3 (type: decimal(5,2)), 
_col1 (type: string), _col0 (type: string)
+                  outputColumnNames: _col0, _col1, _col2, _col3
+                  Statistics: Num rows: 250 Data size: 73500 Basic stats: 
COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: _col3 (type: string), _col2 (type: string)
+                    null sort order: zz
+                    sort order: ++
+                    Statistics: Num rows: 250 Data size: 73500 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    TopN Hash Memory Usage: 0.1
+                    value expressions: _col0 (type: int), _col1 (type: 
decimal(5,2))
         Reducer 3 
             Execution mode: vectorized, llap
             Reduce Operator Tree:
               Select Operator
+                expressions: VALUE._col0 (type: int), VALUE._col1 (type: 
decimal(5,2)), KEY.reducesinkkey1 (type: string)
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 250 Data size: 73500 Basic stats: 
COMPLETE Column stats: COMPLETE
+                Limit
+                  Number of rows: 10
+                  Statistics: Num rows: 10 Data size: 2940 Basic stats: 
COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: enforce_constraint((_col1 is not null and 
(_col1 >= CAST( _col0 AS decimal(5,2))) is not false)) (type: boolean)
+                    Statistics: Num rows: 5 Data size: 1470 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: int)
+                      null sort order: a
+                      sort order: +
+                      Map-reduce partition columns: _col0 (type: int)
+                      Statistics: Num rows: 5 Data size: 1470 Basic stats: 
COMPLETE Column stats: COMPLETE
+                      value expressions: _col1 (type: decimal(5,2)), _col2 
(type: string)
+        Reducer 4 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Select Operator
                 expressions: KEY.reducesinkkey0 (type: int), VALUE._col0 
(type: decimal(5,2)), CAST( VALUE._col1 AS varchar(128)) (type: varchar(128))
                 outputColumnNames: _col0, _col1, _col2
                 Statistics: Num rows: 5 Data size: 1640 Basic stats: COMPLETE 
Column stats: COMPLETE
diff --git a/ql/src/test/results/clientpositive/llap/clusterctas.q.out 
b/ql/src/test/results/clientpositive/llap/clusterctas.q.out
new file mode 100644
index 0000000..40ceee2
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/clusterctas.q.out
@@ -0,0 +1,145 @@
+PREHOOK: query: EXPLAIN
+CREATE TABLE x STORED AS ORC TBLPROPERTIES('transactional'='true') AS
+SELECT * FROM SRC x CLUSTER BY x.key
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@x
+POSTHOOK: query: EXPLAIN
+CREATE TABLE x STORED AS ORC TBLPROPERTIES('transactional'='true') AS
+SELECT * FROM SRC x CLUSTER BY x.key
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@x
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-4 depends on stages: Stage-0, Stage-2
+  Stage-3 depends on stages: Stage-4
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: x
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: 
COMPLETE Column stats: COMPLETE
+                  Select Operator
+                    expressions: key (type: string), value (type: string)
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: string)
+                      null sort order: a
+                      sort order: +
+                      Map-reduce partition columns: _col0 (type: string)
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: 
COMPLETE Column stats: COMPLETE
+                      value expressions: _col1 (type: string)
+            Execution mode: vectorized, llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 
(type: string)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 500 Data size: 89000 Basic stats: 
COMPLETE Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: 
COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: 
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                      output format: 
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                      serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                      name: default.x
+                  Write Type: INSERT
+                Select Operator
+                  expressions: _col0 (type: string), _col1 (type: string)
+                  outputColumnNames: col1, col2
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: 
COMPLETE Column stats: COMPLETE
+                  Group By Operator
+                    aggregations: compute_stats(col1, 'hll'), 
compute_stats(col2, 'hll')
+                    minReductionHashAggr: 0.99
+                    mode: hash
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 1 Data size: 880 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    Reduce Output Operator
+                      null sort order: 
+                      sort order: 
+                      Statistics: Num rows: 1 Data size: 880 Basic stats: 
COMPLETE Column stats: COMPLETE
+                      value expressions: _col0 (type: 
struct<columntype:string,maxlength:bigint,sumlength:bigint,count:bigint,countnulls:bigint,bitvector:binary>),
 _col1 (type: 
struct<columntype:string,maxlength:bigint,sumlength:bigint,count:bigint,countnulls:bigint,bitvector:binary>)
+        Reducer 3 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: compute_stats(VALUE._col0), 
compute_stats(VALUE._col1)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 1 Data size: 880 Basic stats: COMPLETE 
Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 880 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  table:
+                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-2
+    Dependency Collection
+
+  Stage: Stage-4
+    Create Table
+      columns: key string, value string
+      name: default.x
+      input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+      output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+      serde name: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+      table properties:
+        transactional true
+
+  Stage: Stage-3
+    Stats Work
+      Basic Stats Work:
+      Column Stats Desc:
+          Columns: key, value
+          Column Types: string, string
+          Table: default.x
+
+  Stage: Stage-0
+    Move Operator
+      files:
+          hdfs directory: true
+#### A masked pattern was here ####
+          Write Type: INSERT
+
+PREHOOK: query: CREATE TABLE x STORED AS ORC 
TBLPROPERTIES('transactional'='true') AS
+SELECT * FROM SRC x CLUSTER BY x.key
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@x
+POSTHOOK: query: CREATE TABLE x STORED AS ORC 
TBLPROPERTIES('transactional'='true') AS
+SELECT * FROM SRC x CLUSTER BY x.key
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@x
+POSTHOOK: Lineage: x.key SIMPLE [(src)x.FieldSchema(name:key, type:string, 
comment:default), ]
+POSTHOOK: Lineage: x.value SIMPLE [(src)x.FieldSchema(name:value, type:string, 
comment:default), ]
+PREHOOK: query: DROP TABLE x
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@x
+PREHOOK: Output: default@x
+POSTHOOK: query: DROP TABLE x
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@x
+POSTHOOK: Output: default@x
diff --git 
a/ql/src/test/results/clientpositive/llap/enforce_constraint_notnull.q.out 
b/ql/src/test/results/clientpositive/llap/enforce_constraint_notnull.q.out
index 8ccec3a..3f99d0c 100644
--- a/ql/src/test/results/clientpositive/llap/enforce_constraint_notnull.q.out
+++ b/ql/src/test/results/clientpositive/llap/enforce_constraint_notnull.q.out
@@ -3075,7 +3075,8 @@ STAGE PLANS:
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
-        Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
+        Reducer 5 <- Reducer 4 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -3117,23 +3118,37 @@ STAGE PLANS:
                 outputColumnNames: _col0, _col1
                 Statistics: Num rows: 250 Data size: 44500 Basic stats: 
COMPLETE Column stats: COMPLETE
                 Select Operator
-                  expressions: UDFToInteger(_col0) (type: int), CAST( _col0 AS 
decimal(5,2)) (type: decimal(5,2)), _col1 (type: string)
-                  outputColumnNames: _col0, _col1, _col2
-                  Statistics: Num rows: 250 Data size: 51750 Basic stats: 
COMPLETE Column stats: COMPLETE
-                  Limit
-                    Number of rows: 2
-                    Statistics: Num rows: 2 Data size: 414 Basic stats: 
COMPLETE Column stats: COMPLETE
-                    Filter Operator
-                      predicate: enforce_constraint((_col1 is not null and 
_col2 is not null)) (type: boolean)
-                      Statistics: Num rows: 1 Data size: 207 Basic stats: 
COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        null sort order: a
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 1 Data size: 207 Basic stats: 
COMPLETE Column stats: COMPLETE
-                        value expressions: _col1 (type: decimal(5,2)), _col2 
(type: string)
+                  expressions: UDFToInteger(_col0) (type: int), CAST( _col0 AS 
decimal(5,2)) (type: decimal(5,2)), _col1 (type: string), _col0 (type: string)
+                  outputColumnNames: _col0, _col1, _col2, _col3
+                  Statistics: Num rows: 250 Data size: 73500 Basic stats: 
COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: _col3 (type: string)
+                    null sort order: z
+                    sort order: +
+                    Statistics: Num rows: 250 Data size: 73500 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    TopN Hash Memory Usage: 0.1
+                    value expressions: _col0 (type: int), _col1 (type: 
decimal(5,2)), _col2 (type: string)
         Reducer 3 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: VALUE._col0 (type: int), VALUE._col1 (type: 
decimal(5,2)), VALUE._col2 (type: string)
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 250 Data size: 73500 Basic stats: 
COMPLETE Column stats: COMPLETE
+                Limit
+                  Number of rows: 2
+                  Statistics: Num rows: 2 Data size: 588 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  Filter Operator
+                    predicate: enforce_constraint((_col1 is not null and _col2 
is not null)) (type: boolean)
+                    Statistics: Num rows: 1 Data size: 294 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: int)
+                      null sort order: a
+                      sort order: +
+                      Map-reduce partition columns: _col0 (type: int)
+                      Statistics: Num rows: 1 Data size: 294 Basic stats: 
COMPLETE Column stats: COMPLETE
+                      value expressions: _col1 (type: decimal(5,2)), _col2 
(type: string)
+        Reducer 4 
             Execution mode: llap
             Reduce Operator Tree:
               Select Operator
@@ -3164,7 +3179,7 @@ STAGE PLANS:
                       sort order: 
                       Statistics: Num rows: 1 Data size: 1496 Basic stats: 
COMPLETE Column stats: COMPLETE
                       value expressions: _col0 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>),
 _col1 (type: 
struct<columntype:string,min:decimal(5,2),max:decimal(5,2),countnulls:bigint,bitvector:binary>),
 _col2 (type: 
struct<columntype:string,maxlength:bigint,sumlength:bigint,count:bigint,countnulls:bigint,bitvector:binary>)
-        Reducer 4 
+        Reducer 5 
             Execution mode: llap
             Reduce Operator Tree:
               Group By Operator
diff --git 
a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out
 
b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out
index 8196f11..25ce6d6 100644
--- 
a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out
+++ 
b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out
@@ -87,8 +87,9 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -112,7 +113,7 @@ STAGE PLANS:
                         Statistics: Num rows: 5 Data size: 20 Basic stats: 
COMPLETE Column stats: COMPLETE
             Execution mode: llap
             LLAP IO: may be used (ACID table)
-        Map 4 
+        Map 5 
             Map Operator Tree:
                 TableScan
                   alias: cmv_basetable_2_n2
@@ -183,20 +184,30 @@ STAGE PLANS:
                   Statistics: Num rows: 2 Data size: 248 Basic stats: COMPLETE 
Column stats: COMPLETE
                   Group By Operator
                     aggregations: compute_stats(col1, 'hll'), 
compute_stats(col2, 'hll'), compute_stats(col3, 'hll')
-                    mode: complete
+                    minReductionHashAggr: 0.5
+                    mode: hash
                     outputColumnNames: _col0, _col1, _col2
                     Statistics: Num rows: 1 Data size: 1480 Basic stats: 
COMPLETE Column stats: COMPLETE
-                    Select Operator
-                      expressions: _col0 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>),
 _col1 (type: 
struct<columntype:string,min:decimal(10,2),max:decimal(10,2),countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>),
 _col2 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>)
-                      outputColumnNames: _col0, _col1, _col2
+                    Reduce Output Operator
+                      null sort order: 
+                      sort order: 
                       Statistics: Num rows: 1 Data size: 1480 Basic stats: 
COMPLETE Column stats: COMPLETE
-                      File Output Operator
-                        compressed: false
-                        Statistics: Num rows: 1 Data size: 1480 Basic stats: 
COMPLETE Column stats: COMPLETE
-                        table:
-                            input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                            output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                            serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                      value expressions: _col0 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>),
 _col1 (type: 
struct<columntype:string,min:decimal(10,2),max:decimal(10,2),countnulls:bigint,bitvector:binary>),
 _col2 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>)
+        Reducer 4 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: compute_stats(VALUE._col0), 
compute_stats(VALUE._col1), compute_stats(VALUE._col2)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 1 Data size: 1528 Basic stats: COMPLETE 
Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 1528 Basic stats: 
COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-2
     Dependency Collection
@@ -275,10 +286,10 @@ Table Type:               MATERIALIZED_VIEW
 Table Parameters:               
        COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"_c2\":\"true\",\"a\":\"true\",\"c\":\"true\"}}
        bucketing_version       2                   
-       numFiles                1                   
+       numFiles                2                   
        numRows                 2                   
        rawDataSize             0                   
-       totalSize               819                 
+       totalSize               1539                
        transactional           true                
        transactional_properties        default             
 #### A masked pattern was here ####
@@ -514,10 +525,10 @@ Table Type:               MATERIALIZED_VIEW
 Table Parameters:               
        COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"_c2\":\"true\",\"a\":\"true\",\"c\":\"true\"}}
        bucketing_version       2                   
-       numFiles                1                   
+       numFiles                2                   
        numRows                 2                   
        rawDataSize             0                   
-       totalSize               819                 
+       totalSize               1539                
        transactional           true                
        transactional_properties        default             
 #### A masked pattern was here ####
@@ -997,10 +1008,10 @@ Table Type:              MATERIALIZED_VIEW
 Table Parameters:               
        COLUMN_STATS_ACCURATE   {\"BASIC_STATS\":\"true\"}
        bucketing_version       2                   
-       numFiles                2                   
+       numFiles                3                   
        numRows                 3                   
        rawDataSize             0                   
-       totalSize               1576                
+       totalSize               2296                
        transactional           true                
        transactional_properties        default             
 #### A masked pattern was here ####
@@ -1081,8 +1092,8 @@ POSTHOOK: Input: default@cmv_basetable_2_n2
 POSTHOOK: Input: default@cmv_basetable_n5
 POSTHOOK: Input: default@cmv_mat_view_n5
 #### A masked pattern was here ####
-1      2
 3      6
+1      2
 3      2
 PREHOOK: query: UPDATE cmv_basetable_2_n2 SET a=2 WHERE a=1
 PREHOOK: type: QUERY
@@ -1116,8 +1127,9 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1141,7 +1153,7 @@ STAGE PLANS:
                         Statistics: Num rows: 5 Data size: 20 Basic stats: 
COMPLETE Column stats: COMPLETE
             Execution mode: llap
             LLAP IO: may be used (ACID table)
-        Map 4 
+        Map 5 
             Map Operator Tree:
                 TableScan
                   alias: cmv_basetable_2_n2
@@ -1212,20 +1224,30 @@ STAGE PLANS:
                   Statistics: Num rows: 2 Data size: 248 Basic stats: COMPLETE 
Column stats: COMPLETE
                   Group By Operator
                     aggregations: compute_stats(a, 'hll'), compute_stats(c, 
'hll'), compute_stats(_c2, 'hll')
-                    mode: complete
+                    minReductionHashAggr: 0.5
+                    mode: hash
                     outputColumnNames: _col0, _col1, _col2
                     Statistics: Num rows: 1 Data size: 1480 Basic stats: 
COMPLETE Column stats: COMPLETE
-                    Select Operator
-                      expressions: _col0 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>),
 _col1 (type: 
struct<columntype:string,min:decimal(10,2),max:decimal(10,2),countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>),
 _col2 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>)
-                      outputColumnNames: _col0, _col1, _col2
+                    Reduce Output Operator
+                      null sort order: 
+                      sort order: 
                       Statistics: Num rows: 1 Data size: 1480 Basic stats: 
COMPLETE Column stats: COMPLETE
-                      File Output Operator
-                        compressed: false
-                        Statistics: Num rows: 1 Data size: 1480 Basic stats: 
COMPLETE Column stats: COMPLETE
-                        table:
-                            input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                            output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                            serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                      value expressions: _col0 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>),
 _col1 (type: 
struct<columntype:string,min:decimal(10,2),max:decimal(10,2),countnulls:bigint,bitvector:binary>),
 _col2 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>)
+        Reducer 4 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: compute_stats(VALUE._col0), 
compute_stats(VALUE._col1), compute_stats(VALUE._col2)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 1 Data size: 1528 Basic stats: COMPLETE 
Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 1528 Basic stats: 
COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-2
     Dependency Collection
@@ -1287,10 +1309,10 @@ Table Type:             MATERIALIZED_VIEW
 Table Parameters:               
        COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"_c2\":\"true\",\"a\":\"true\",\"c\":\"true\"}}
        bucketing_version       2                   
-       numFiles                1                   
+       numFiles                2                   
        numRows                 3                   
        rawDataSize             0                   
-       totalSize               822                 
+       totalSize               1041                
        transactional           true                
        transactional_properties        default             
 #### A masked pattern was here ####
@@ -1406,8 +1428,9 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1431,7 +1454,7 @@ STAGE PLANS:
                         Statistics: Num rows: 5 Data size: 20 Basic stats: 
COMPLETE Column stats: COMPLETE
             Execution mode: llap
             LLAP IO: may be used (ACID table)
-        Map 4 
+        Map 5 
             Map Operator Tree:
                 TableScan
                   alias: cmv_basetable_2_n2
@@ -1502,20 +1525,30 @@ STAGE PLANS:
                   Statistics: Num rows: 2 Data size: 248 Basic stats: COMPLETE 
Column stats: COMPLETE
                   Group By Operator
                     aggregations: compute_stats(a, 'hll'), compute_stats(c, 
'hll'), compute_stats(_c2, 'hll')
-                    mode: complete
+                    minReductionHashAggr: 0.5
+                    mode: hash
                     outputColumnNames: _col0, _col1, _col2
                     Statistics: Num rows: 1 Data size: 1480 Basic stats: 
COMPLETE Column stats: COMPLETE
-                    Select Operator
-                      expressions: _col0 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>),
 _col1 (type: 
struct<columntype:string,min:decimal(10,2),max:decimal(10,2),countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>),
 _col2 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>)
-                      outputColumnNames: _col0, _col1, _col2
+                    Reduce Output Operator
+                      null sort order: 
+                      sort order: 
                       Statistics: Num rows: 1 Data size: 1480 Basic stats: 
COMPLETE Column stats: COMPLETE
-                      File Output Operator
-                        compressed: false
-                        Statistics: Num rows: 1 Data size: 1480 Basic stats: 
COMPLETE Column stats: COMPLETE
-                        table:
-                            input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                            output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                            serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                      value expressions: _col0 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>),
 _col1 (type: 
struct<columntype:string,min:decimal(10,2),max:decimal(10,2),countnulls:bigint,bitvector:binary>),
 _col2 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>)
+        Reducer 4 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: compute_stats(VALUE._col0), 
compute_stats(VALUE._col1), compute_stats(VALUE._col2)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 1 Data size: 1528 Basic stats: COMPLETE 
Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 1528 Basic stats: 
COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-2
     Dependency Collection
@@ -1577,10 +1610,10 @@ Table Type:             MATERIALIZED_VIEW
 Table Parameters:               
        COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"_c2\":\"true\",\"a\":\"true\",\"c\":\"true\"}}
        bucketing_version       2                   
-       numFiles                1                   
+       numFiles                2                   
        numRows                 2                   
        rawDataSize             0                   
-       totalSize               820                 
+       totalSize               1039                
        transactional           true                
        transactional_properties        default             
 #### A masked pattern was here ####
@@ -1987,10 +2020,10 @@ Table Type:             MATERIALIZED_VIEW
 Table Parameters:               
        COLUMN_STATS_ACCURATE   {\"BASIC_STATS\":\"true\"}
        bucketing_version       2                   
-       numFiles                2                   
+       numFiles                3                   
        numRows                 3                   
        rawDataSize             0                   
-       totalSize               1576                
+       totalSize               1795                
        transactional           true                
        transactional_properties        default             
 #### A masked pattern was here ####
diff --git 
a/ql/src/test/results/clientpositive/llap/materialized_view_rewrite_window.q.out
 
b/ql/src/test/results/clientpositive/llap/materialized_view_rewrite_window.q.out
index fd330b0..4e35bba 100644
--- 
a/ql/src/test/results/clientpositive/llap/materialized_view_rewrite_window.q.out
+++ 
b/ql/src/test/results/clientpositive/llap/materialized_view_rewrite_window.q.out
@@ -239,9 +239,10 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
-        Reducer 5 <- Map 4 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
+        Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -265,7 +266,7 @@ STAGE PLANS:
                         Statistics: Num rows: 5 Data size: 460 Basic stats: 
COMPLETE Column stats: COMPLETE
             Execution mode: vectorized, llap
             LLAP IO: may be used (ACID table)
-        Map 4 
+        Map 5 
             Map Operator Tree:
                 TableScan
                   alias: tv_view_data
@@ -328,21 +329,31 @@ STAGE PLANS:
                   Statistics: Num rows: 2 Data size: 240 Basic stats: COMPLETE 
Column stats: COMPLETE
                   Group By Operator
                     aggregations: compute_stats(quartile, 'hll'), 
compute_stats(total, 'hll')
-                    mode: complete
+                    minReductionHashAggr: 0.5
+                    mode: hash
                     outputColumnNames: _col0, _col1
                     Statistics: Num rows: 1 Data size: 1056 Basic stats: 
COMPLETE Column stats: COMPLETE
-                    Select Operator
-                      expressions: _col0 (type: 
struct<columntype:string,min:decimal(12,1),max:decimal(12,1),countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>),
 _col1 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>)
-                      outputColumnNames: _col0, _col1
+                    Reduce Output Operator
+                      null sort order: 
+                      sort order: 
                       Statistics: Num rows: 1 Data size: 1056 Basic stats: 
COMPLETE Column stats: COMPLETE
-                      File Output Operator
-                        compressed: false
-                        Statistics: Num rows: 1 Data size: 1056 Basic stats: 
COMPLETE Column stats: COMPLETE
-                        table:
-                            input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                            output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                            serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-        Reducer 5 
+                      value expressions: _col0 (type: 
struct<columntype:string,min:decimal(12,1),max:decimal(12,1),countnulls:bigint,bitvector:binary>),
 _col1 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>)
+        Reducer 4 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: compute_stats(VALUE._col0), 
compute_stats(VALUE._col1)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 1 Data size: 1088 Basic stats: COMPLETE 
Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 1088 Basic stats: 
COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 6 
             Execution mode: vectorized, llap
             Reduce Operator Tree:
               Select Operator
@@ -535,9 +546,10 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
-        Reducer 5 <- Map 4 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
+        Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -561,7 +573,7 @@ STAGE PLANS:
                         Statistics: Num rows: 5 Data size: 460 Basic stats: 
COMPLETE Column stats: COMPLETE
             Execution mode: vectorized, llap
             LLAP IO: may be used (ACID table)
-        Map 4 
+        Map 5 
             Map Operator Tree:
                 TableScan
                   alias: tv_view_data
@@ -624,21 +636,31 @@ STAGE PLANS:
                   Statistics: Num rows: 2 Data size: 240 Basic stats: COMPLETE 
Column stats: COMPLETE
                   Group By Operator
                     aggregations: compute_stats(quartile, 'hll'), 
compute_stats(total, 'hll')
-                    mode: complete
+                    minReductionHashAggr: 0.5
+                    mode: hash
                     outputColumnNames: _col0, _col1
                     Statistics: Num rows: 1 Data size: 1056 Basic stats: 
COMPLETE Column stats: COMPLETE
-                    Select Operator
-                      expressions: _col0 (type: 
struct<columntype:string,min:decimal(12,1),max:decimal(12,1),countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>),
 _col1 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,numdistinctvalues:bigint,ndvbitvector:binary>)
-                      outputColumnNames: _col0, _col1
+                    Reduce Output Operator
+                      null sort order: 
+                      sort order: 
                       Statistics: Num rows: 1 Data size: 1056 Basic stats: 
COMPLETE Column stats: COMPLETE
-                      File Output Operator
-                        compressed: false
-                        Statistics: Num rows: 1 Data size: 1056 Basic stats: 
COMPLETE Column stats: COMPLETE
-                        table:
-                            input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
-                            output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                            serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-        Reducer 5 
+                      value expressions: _col0 (type: 
struct<columntype:string,min:decimal(12,1),max:decimal(12,1),countnulls:bigint,bitvector:binary>),
 _col1 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>)
+        Reducer 4 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: compute_stats(VALUE._col0), 
compute_stats(VALUE._col1)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 1 Data size: 1088 Basic stats: COMPLETE 
Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 1088 Basic stats: 
COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 6 
             Execution mode: vectorized, llap
             Reduce Operator Tree:
               Select Operator
diff --git a/ql/src/test/results/clientpositive/llap/semijoin_reddedup.q.out 
b/ql/src/test/results/clientpositive/llap/semijoin_reddedup.q.out
index ae95609..fcae6ca 100644
--- a/ql/src/test/results/clientpositive/llap/semijoin_reddedup.q.out
+++ b/ql/src/test/results/clientpositive/llap/semijoin_reddedup.q.out
@@ -258,12 +258,13 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE)
-        Reducer 4 <- Map 10 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
+        Reducer 10 <- Map 9 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 10 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
+        Reducer 4 <- Map 11 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE)
         Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
-        Reducer 6 <- Reducer 5 (CUSTOM_SIMPLE_EDGE)
-        Reducer 9 <- Map 8 (SIMPLE_EDGE)
+        Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
+        Reducer 7 <- Reducer 6 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -288,7 +289,7 @@ STAGE PLANS:
                         value expressions: _col0 (type: bigint), _col2 (type: 
double), _col3 (type: string)
             Execution mode: vectorized, llap
             LLAP IO: may be used (ACID table)
-        Map 10 
+        Map 11 
             Map Operator Tree:
                 TableScan
                   alias: l
@@ -310,7 +311,7 @@ STAGE PLANS:
                         value expressions: _col1 (type: double)
             Execution mode: vectorized, llap
             LLAP IO: may be used (ACID table)
-        Map 7 
+        Map 8 
             Map Operator Tree:
                 TableScan
                   alias: customer
@@ -332,7 +333,7 @@ STAGE PLANS:
                         value expressions: _col1 (type: string)
             Execution mode: vectorized, llap
             LLAP IO: may be used (ACID table)
-        Map 8 
+        Map 9 
             Map Operator Tree:
                 TableScan
                   alias: lineitem
@@ -359,6 +360,28 @@ STAGE PLANS:
                         value expressions: _col1 (type: double)
             Execution mode: vectorized, llap
             LLAP IO: may be used (ACID table)
+        Reducer 10 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                keys: KEY._col0 (type: bigint)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 2849995116 Data size: 43319925835 Basic 
stats: COMPLETE Column stats: NONE
+                Filter Operator
+                  predicate: (_col1 > 300.0D) (type: boolean)
+                  Statistics: Num rows: 949998372 Data size: 14439975278 Basic 
stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: _col0 (type: bigint)
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 949998372 Data size: 14439975278 
Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: bigint)
+                      null sort order: z
+                      sort order: +
+                      Map-reduce partition columns: _col0 (type: bigint)
+                      Statistics: Num rows: 949998372 Data size: 14439975278 
Basic stats: COMPLETE Column stats: NONE
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
@@ -428,7 +451,7 @@ STAGE PLANS:
                       TopN Hash Memory Usage: 0.1
                       value expressions: _col5 (type: double)
         Reducer 5 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: sum(VALUE._col0)
@@ -440,34 +463,48 @@ STAGE PLANS:
                   expressions: _col4 (type: string), _col3 (type: bigint), 
_col2 (type: bigint), _col1 (type: string), _col0 (type: double), _col5 (type: 
double)
                   outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
                   Statistics: Num rows: 3134994695 Data size: 47651919443 
Basic stats: COMPLETE Column stats: NONE
-                  Limit
-                    Number of rows: 100
+                  Reduce Output Operator
+                    key expressions: _col4 (type: double), _col3 (type: string)
+                    null sort order: zz
+                    sort order: -+
+                    Statistics: Num rows: 3134994695 Data size: 47651919443 
Basic stats: COMPLETE Column stats: NONE
+                    TopN Hash Memory Usage: 0.1
+                    value expressions: _col0 (type: string), _col1 (type: 
bigint), _col2 (type: bigint), _col5 (type: double)
+        Reducer 6 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: VALUE._col0 (type: string), VALUE._col1 (type: 
bigint), VALUE._col2 (type: bigint), KEY.reducesinkkey1 (type: string), 
KEY.reducesinkkey0 (type: double), VALUE._col3 (type: double)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
+                Statistics: Num rows: 3134994695 Data size: 47651919443 Basic 
stats: COMPLETE Column stats: NONE
+                Limit
+                  Number of rows: 100
+                  Statistics: Num rows: 100 Data size: 1500 Basic stats: 
COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
                     Statistics: Num rows: 100 Data size: 1500 Basic stats: 
COMPLETE Column stats: NONE
-                    File Output Operator
-                      compressed: false
-                      Statistics: Num rows: 100 Data size: 1500 Basic stats: 
COMPLETE Column stats: NONE
-                      table:
-                          input format: 
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
-                          output format: 
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
-                          serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
-                          name: tpch_test.q18_large_volume_customer_cached
-                      Write Type: INSERT
-                    Select Operator
-                      expressions: _col0 (type: string), _col1 (type: bigint), 
_col2 (type: bigint), _col3 (type: string), _col4 (type: double), _col5 (type: 
double)
-                      outputColumnNames: col1, col2, col3, col4, col5, col6
-                      Statistics: Num rows: 100 Data size: 1500 Basic stats: 
COMPLETE Column stats: NONE
-                      Group By Operator
-                        aggregations: compute_stats(col1, 'hll'), 
compute_stats(col2, 'hll'), compute_stats(col3, 'hll'), compute_stats(col4, 
'hll'), compute_stats(col5, 'hll'), compute_stats(col6, 'hll')
-                        minReductionHashAggr: 0.99
-                        mode: hash
-                        outputColumnNames: _col0, _col1, _col2, _col3, _col4, 
_col5
+                    table:
+                        input format: 
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                        output format: 
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                        serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                        name: tpch_test.q18_large_volume_customer_cached
+                    Write Type: INSERT
+                  Select Operator
+                    expressions: _col0 (type: string), _col1 (type: bigint), 
_col2 (type: bigint), _col3 (type: string), _col4 (type: double), _col5 (type: 
double)
+                    outputColumnNames: col1, col2, col3, col4, col5, col6
+                    Statistics: Num rows: 100 Data size: 1500 Basic stats: 
COMPLETE Column stats: NONE
+                    Group By Operator
+                      aggregations: compute_stats(col1, 'hll'), 
compute_stats(col2, 'hll'), compute_stats(col3, 'hll'), compute_stats(col4, 
'hll'), compute_stats(col5, 'hll'), compute_stats(col6, 'hll')
+                      minReductionHashAggr: 0.99
+                      mode: hash
+                      outputColumnNames: _col0, _col1, _col2, _col3, _col4, 
_col5
+                      Statistics: Num rows: 1 Data size: 2576 Basic stats: 
COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        null sort order: 
+                        sort order: 
                         Statistics: Num rows: 1 Data size: 2576 Basic stats: 
COMPLETE Column stats: NONE
-                        Reduce Output Operator
-                          null sort order: 
-                          sort order: 
-                          Statistics: Num rows: 1 Data size: 2576 Basic stats: 
COMPLETE Column stats: NONE
-                          value expressions: _col0 (type: 
struct<columntype:string,maxlength:bigint,sumlength:bigint,count:bigint,countnulls:bigint,bitvector:binary>),
 _col1 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>),
 _col2 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>),
 _col3 (type: 
struct<columntype:string,maxlength:bigint,sumlength:bigint,count:bigint,countnulls:bigint,bitvector:binary>),
 _col4 [...]
-        Reducer 6 
+                        value expressions: _col0 (type: 
struct<columntype:string,maxlength:bigint,sumlength:bigint,count:bigint,countnulls:bigint,bitvector:binary>),
 _col1 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>),
 _col2 (type: 
struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>),
 _col3 (type: 
struct<columntype:string,maxlength:bigint,sumlength:bigint,count:bigint,countnulls:bigint,bitvector:binary>),
 _col4 ( [...]
+        Reducer 7 
             Execution mode: llap
             Reduce Operator Tree:
               Group By Operator
@@ -482,28 +519,6 @@ STAGE PLANS:
                       input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
                       output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-        Reducer 9 
-            Execution mode: vectorized, llap
-            Reduce Operator Tree:
-              Group By Operator
-                aggregations: sum(VALUE._col0)
-                keys: KEY._col0 (type: bigint)
-                mode: mergepartial
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 2849995116 Data size: 43319925835 Basic 
stats: COMPLETE Column stats: NONE
-                Filter Operator
-                  predicate: (_col1 > 300.0D) (type: boolean)
-                  Statistics: Num rows: 949998372 Data size: 14439975278 Basic 
stats: COMPLETE Column stats: NONE
-                  Select Operator
-                    expressions: _col0 (type: bigint)
-                    outputColumnNames: _col0
-                    Statistics: Num rows: 949998372 Data size: 14439975278 
Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: _col0 (type: bigint)
-                      null sort order: z
-                      sort order: +
-                      Map-reduce partition columns: _col0 (type: bigint)
-                      Statistics: Num rows: 949998372 Data size: 14439975278 
Basic stats: COMPLETE Column stats: NONE
 
   Stage: Stage-2
     Dependency Collection

Reply via email to