This is an automated email from the ASF dual-hosted git repository.

gparai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 3233d8aaff57ac71bd3b726efcd5fdaa92aef861
Author: Cliff Buchanan <[email protected]>
AuthorDate: Tue Oct 16 15:17:43 2018 -0700

    DRILL-1328: Support table statistics
---
 .../drill/exec/store/mapr/TableFormatPlugin.java   |   2 +-
 .../store/mapr/streams/StreamsFormatPlugin.java    |   2 +-
 exec/java-exec/pom.xml                             |   5 +
 exec/java-exec/src/main/codegen/data/Parser.tdd    |  28 +-
 .../src/main/codegen/includes/parserImpls.ftl      |  46 +++-
 .../apache/drill/exec/dotdrill/DotDrillType.java   |   5 +-
 .../apache/drill/exec/dotdrill/DotDrillUtil.java   |  14 +-
 .../exec/expr/fn/impl/StatisticsAggrFunctions.java | 285 +++++++++++++++++++++
 .../org/apache/drill/exec/ops/QueryContext.java    |   4 +
 .../physical/base/AbstractPhysicalVisitor.java     |  12 +
 .../drill/exec/physical/base/PhysicalVisitor.java  |   4 +
 .../exec/physical/config/StatisticsAggregate.java  |  66 +++++
 .../drill/exec/physical/config/UnpivotMaps.java    |  59 +++++
 .../physical/impl/aggregate/InternalBatch.java     |   6 +
 .../impl/aggregate/StatisticsAggBatch.java         | 223 ++++++++++++++++
 .../impl/aggregate/StatisticsAggBatchCreator.java  |  38 +++
 .../physical/impl/aggregate/StreamingAggBatch.java |  16 +-
 .../impl/unpivot/UnpivotMapsBatchCreator.java      |  38 +++
 .../impl/unpivot/UnpivotMapsRecordBatch.java       | 244 ++++++++++++++++++
 .../apache/drill/exec/planner/PlannerPhase.java    |   2 +
 .../exec/planner/common/DrillJoinRelBase.java      |  51 +++-
 .../drill/exec/planner/common/DrillStatsTable.java | 158 ++++++++++++
 .../planner/cost/DrillRelMdDistinctRowCount.java   |  52 ++++
 .../exec/planner/cost/DrillRelMdRowCount.java      |  19 ++
 .../exec/planner/logical/DrillAnalyzeRel.java      |  71 +++++
 .../drill/exec/planner/logical/DrillTable.java     |  10 +
 .../logical/FileSystemCreateTableEntry.java        |   6 +-
 .../drill/exec/planner/physical/AnalyzePrule.java  |  64 +++++
 .../drill/exec/planner/physical/StatsAggPrel.java  |  86 +++++++
 .../exec/planner/physical/UnpivotMapsPrel.java     |  85 ++++++
 .../planner/sql/handlers/AnalyzeTableHandler.java  | 157 ++++++++++++
 .../planner/sql/handlers/DefaultSqlHandler.java    |   2 +
 .../sql/parser/CompoundIdentifierConverter.java    |   1 +
 .../exec/planner/sql/parser/SqlAnalyzeTable.java   | 157 ++++++++++++
 .../apache/drill/exec/store/AbstractSchema.java    |  34 +++
 .../apache/drill/exec/store/SubSchemaWrapper.java  |  15 ++
 .../exec/store/dfs/FileSystemSchemaFactory.java    |  15 ++
 .../apache/drill/exec/store/dfs/FormatPlugin.java  |   2 +-
 .../exec/store/dfs/WorkspaceSchemaFactory.java     |  90 ++++++-
 .../exec/store/dfs/easy/EasyFormatPlugin.java      |   4 +-
 .../drill/exec/store/dfs/easy/EasyWriter.java      |  12 +-
 .../exec/store/easy/json/JSONFormatPlugin.java     |   2 +
 .../exec/store/easy/json/JsonRecordWriter.java     |   8 +-
 .../exec/store/parquet/ParquetFormatPlugin.java    |   8 +-
 .../drill/exec/store/parquet/ParquetWriter.java    |  12 +-
 .../org/apache/drill/exec/sql/TestAnalyze.java     | 105 ++++++++
 exec/jdbc-all/pom.xml                              |   4 +
 .../apache/drill/common/logical/data/Analyze.java  |  35 +++
 .../data/visitors/AbstractLogicalVisitor.java      |   6 +
 .../logical/data/visitors/LogicalVisitor.java      |   2 +
 .../org/apache/drill/exec/proto/UserBitShared.java |  29 ++-
 .../drill/exec/proto/beans/CoreOperatorType.java   |   6 +-
 protocol/src/main/protobuf/UserBitShared.proto     |   2 +
 53 files changed, 2356 insertions(+), 53 deletions(-)

diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
index aeb117a..1c30264 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
@@ -94,7 +94,7 @@ public abstract class TableFormatPlugin implements 
FormatPlugin {
 
   @Override
   public AbstractWriter getWriter(PhysicalOperator child, String location,
-      List<String> partitionColumns) throws IOException {
+      boolean append, List<String> partitionColumns) throws IOException {
     throw new UnsupportedOperationException();
   }
 
diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
index 206954b..76466ab 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
@@ -64,7 +64,7 @@ public class StreamsFormatPlugin extends TableFormatPlugin {
 
   @Override
   public AbstractWriter getWriter(PhysicalOperator child, String location,
-      List<String> partitionColumns) throws IOException {
+      boolean append, List<String> partitionColumns) throws IOException {
     throw new UnsupportedOperationException();
   }
 
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index df150e5..d7f7393 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -352,6 +352,11 @@
       <artifactId>commons-compiler</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.clearspring.analytics</groupId>
+      <artifactId>stream</artifactId>
+      <version>2.7.0</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <exclusions>
diff --git a/exec/java-exec/src/main/codegen/data/Parser.tdd 
b/exec/java-exec/src/main/codegen/data/Parser.tdd
index 820ecb5..ec56af4 100644
--- a/exec/java-exec/src/main/codegen/data/Parser.tdd
+++ b/exec/java-exec/src/main/codegen/data/Parser.tdd
@@ -38,6 +38,30 @@
     "IF",
     "JAR",
     "PROPERTIES"
+    "ANALYZE",
+    "COMPUTE",
+    "ESTIMATE",
+    "STATISTICS",
+    "COLUMNS",
+    "SAMPLE"
+  ]
+
+  # List of keywords from "keywords" section that are not reserved by SQL:2003 
standard.
+  # Example: "DATABASES", "TABLES" are keywords but are not reserved by 
SQL:2003 standard.
+  # First keyword that starts the statement should be a reserved keyword, 
otherwise the current parser
+  # ends up considering it as a expression and fails.
+  nonReservedKeywords: [
+    "DATABASES",
+    "REPLACE",
+    "SCHEMAS",
+    "TABLES",
+    "FILES",
+    "METADATA",
+    "COMPUTE",
+    "ESTIMATE",
+    "STATISTICS",
+    "COLUMNS",
+    "SAMPLE"
   ]
 
   # List of methods for parsing custom SQL statements.
@@ -52,7 +76,9 @@
     "SqlShowFiles()",
     "SqlRefreshMetadata()",
     "SqlCreateFunction()",
-    "SqlDropFunction()"
+    "SqlDropFunction()",
+    "SqlRefreshMetadata()",
+    "SqlAnalyzeTable()"
   ]
 
   # List of methods for parsing custom literals.
diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl 
b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
index 8afc8f8..2606006 100644
--- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
+++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
@@ -557,4 +557,48 @@ Pair<SqlNodeList, SqlNodeList> 
ParenthesizedCompoundIdentifierList() :
        return Pair.of(new SqlNodeList(list, getPos()), null);
     }
 }
-</#if>
\ No newline at end of file
+</#if>
+/**
+ * Parses a analyze statement.
+ * ANALYZE TABLE tblname {COMPUTE | ESTIMATE} | STATISTICS FOR
+ *      {ALL COLUMNS | COLUMNS (field1, field2, ...)} [ SAMPLE numeric PERCENT 
]
+ */
+SqlNode SqlAnalyzeTable() :
+{
+    SqlParserPos pos;
+    SqlIdentifier tblName;
+    SqlLiteral estimate = null;
+    SqlNodeList fieldList = null;
+    SqlNumericLiteral percent = null;
+}
+{
+    <ANALYZE> { pos = getPos(); }
+    <TABLE>
+    tblName = CompoundIdentifier()
+    (
+        <COMPUTE> { estimate = SqlLiteral.createBoolean(false, pos); }
+        |
+        <ESTIMATE> { estimate = SqlLiteral.createBoolean(true, pos); }
+    )
+    <STATISTICS> <FOR>
+    (
+        ( <ALL> <COLUMNS> )
+        |
+        ( <COLUMNS> fieldList = ParseRequiredFieldList("Table") )
+    )
+    [
+        <SAMPLE> percent = UnsignedNumericLiteral() <PERCENT>
+        {
+            BigDecimal rate = percent.bigDecimalValue();
+            if (rate.compareTo(BigDecimal.ZERO) <= 0 ||
+                rate.compareTo(BigDecimal.valueOf(100L)) > 0)
+            {
+                throw new ParseException("Invalid percentage for ANALYZE 
TABLE");
+            }
+        }
+    ]
+    {
+        if (percent == null) { percent = 
SqlLiteral.createExactNumeric("100.0", pos); }
+        return new SqlAnalyzeTable(pos, tblName, estimate, fieldList, percent);
+    }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillType.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillType.java
index 673e1c7..589e982 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillType.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillType.java
@@ -21,10 +21,9 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
 public enum DotDrillType {
-  VIEW;
+  VIEW,
   // ,FORMAT
-  // ,STATS
-
+  STATS;
 
   private final String ending;
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java
index b6571df..32759d2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java
@@ -17,19 +17,18 @@
  */
 package org.apache.drill.exec.dotdrill;
 
-import java.io.IOException;
 import java.io.FileNotFoundException;
-import java.util.List;
-import java.util.Arrays;
+import java.io.IOException;
 import java.util.ArrayList;
-
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.GlobPattern;
 import org.apache.hadoop.fs.Path;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
 public class DotDrillUtil {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DotDrillUtil.class);
 
@@ -42,6 +41,9 @@ public class DotDrillUtil {
    * @return List of matched DotDrillFile objects
    */
   private static List<DotDrillFile> getDrillFiles(DrillFileSystem fs, 
List<FileStatus> statuses, DotDrillType... types){
+    if (statuses == null) {
+      return Collections.emptyList();
+    }
     List<DotDrillFile> files = Lists.newArrayList();
     for(FileStatus s : statuses){
       DotDrillFile f = DotDrillFile.create(fs, s);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StatisticsAggrFunctions.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StatisticsAggrFunctions.java
new file mode 100644
index 0000000..c6430dd
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StatisticsAggrFunctions.java
@@ -0,0 +1,285 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+
+/*
+ * This class is automatically generated from AggrTypeFunctions2.tdd using 
FreeMarker.
+ */
+
+package org.apache.drill.exec.expr.fn.impl;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.expr.DrillAggFunc;
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder;
+import org.apache.drill.exec.expr.holders.ObjectHolder;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+import javax.inject.Inject;
+
+@SuppressWarnings("unused")
+public class StatisticsAggrFunctions {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StatisticsAggrFunctions.class);
+
+  @FunctionTemplate(name = "statcount", scope = 
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+  public static class StatCount implements DrillAggFunc {
+    @Param FieldReader in;
+    @Workspace BigIntHolder count;
+    @Output NullableBigIntHolder out;
+
+    @Override
+    public void setup() {
+      count = new BigIntHolder();
+    }
+
+    @Override
+    public void add() {
+      count.value++;
+    }
+
+    @Override
+    public void output() {
+      out.isSet = 1;
+      out.value = count.value;
+    }
+
+    @Override
+    public void reset() {
+      count.value = 0;
+    }
+  }
+
+  @FunctionTemplate(name = "nonnullstatcount", scope = 
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+  public static class NonNullStatCount implements DrillAggFunc {
+    @Param FieldReader in;
+    @Workspace BigIntHolder count;
+    @Output NullableBigIntHolder out;
+
+    @Override
+    public void setup() {
+      count = new BigIntHolder();
+    }
+
+    @Override
+    public void add() {
+      if (in.isSet()) {
+        count.value++;
+      }
+    }
+
+    @Override
+    public void output() {
+      out.isSet = 1;
+      out.value = count.value;
+    }
+
+    @Override
+    public void reset() {
+      count.value = 0;
+    }
+  }
+
+  @FunctionTemplate(name = "hll", scope = 
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+  public static class HllFieldReader implements DrillAggFunc {
+    @Param FieldReader in;
+    @Workspace ObjectHolder work;
+    @Output NullableVarBinaryHolder out;
+    @Inject DrillBuf buffer;
+
+    @Override
+    public void setup() {
+      work = new ObjectHolder();
+      work.obj = new 
com.clearspring.analytics.stream.cardinality.HyperLogLog(10);
+    }
+
+    @Override
+    public void add() {
+      if (work.obj != null) {
+        com.clearspring.analytics.stream.cardinality.HyperLogLog hll =
+            (com.clearspring.analytics.stream.cardinality.HyperLogLog) 
work.obj;
+        int mode = in.getType().getMode().getNumber();
+        int type = in.getType().getMinorType().getNumber();
+
+        switch (mode) {
+          case 
org.apache.drill.common.types.TypeProtos.DataMode.OPTIONAL_VALUE:
+            if (!in.isSet()) {
+              hll.offer(null);
+              break;
+            }
+            // fall through //
+          case 
org.apache.drill.common.types.TypeProtos.DataMode.REQUIRED_VALUE:
+            switch (type) {
+              case 
org.apache.drill.common.types.TypeProtos.MinorType.VARCHAR_VALUE:
+                hll.offer(in.readText().toString());
+                break;
+              default:
+                work.obj = null;
+            }
+            break;
+          default:
+            work.obj = null;
+        }
+      }
+    }
+
+    @Override
+    public void output() {
+      if (work.obj != null) {
+        com.clearspring.analytics.stream.cardinality.HyperLogLog hll =
+            (com.clearspring.analytics.stream.cardinality.HyperLogLog) 
work.obj;
+
+        try {
+          byte[] ba = hll.getBytes();
+          out.buffer = buffer.reallocIfNeeded(ba.length);
+          out.start = 0;
+          out.end = ba.length;
+          out.buffer.setBytes(0, ba);
+          out.isSet = 1;
+        } catch (java.io.IOException e) {
+          throw new 
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get 
HyperLogLog output", e);
+        }
+      } else {
+        out.isSet = 0;
+      }
+    }
+
+    @Override
+    public void reset() {
+      work.obj = new 
com.clearspring.analytics.stream.cardinality.HyperLogLog(10);
+    }
+  }
+
+
+  @FunctionTemplate(name = "ndv", scope = 
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+  public static class NdvVarBinary implements DrillAggFunc {
+    @Param
+    FieldReader in;
+    @Workspace
+    ObjectHolder work;
+    @Output
+    NullableBigIntHolder out;
+
+    @Override
+    public void setup() {
+      work = new ObjectHolder();
+      work.obj = new 
com.clearspring.analytics.stream.cardinality.HyperLogLog(10);
+    }
+
+    @Override
+    public void add() {
+      if (work.obj != null) {
+        com.clearspring.analytics.stream.cardinality.HyperLogLog hll =
+            (com.clearspring.analytics.stream.cardinality.HyperLogLog) 
work.obj;
+        int mode = in.getType().getMode().getNumber();
+        int type = in.getType().getMinorType().getNumber();
+
+        switch (mode) {
+          case 
org.apache.drill.common.types.TypeProtos.DataMode.OPTIONAL_VALUE:
+            if (!in.isSet()) {
+              hll.offer(null);
+              break;
+            }
+            // fall through //
+          case 
org.apache.drill.common.types.TypeProtos.DataMode.REQUIRED_VALUE:
+            switch (type) {
+              case 
org.apache.drill.common.types.TypeProtos.MinorType.VARCHAR_VALUE:
+                hll.offer(in.readText().toString());
+                break;
+              case 
org.apache.drill.common.types.TypeProtos.MinorType.FLOAT8_VALUE:
+                hll.offer(in.readDouble());
+                break;
+              case 
org.apache.drill.common.types.TypeProtos.MinorType.INT_VALUE:
+                hll.offer(in.readInteger());
+                break;
+              case 
org.apache.drill.common.types.TypeProtos.MinorType.BIGINT_VALUE:
+                hll.offer(in.readLong());
+                break;
+              case 
org.apache.drill.common.types.TypeProtos.MinorType.DATE_VALUE:
+              case 
org.apache.drill.common.types.TypeProtos.MinorType.TIMESTAMP_VALUE:
+              case 
org.apache.drill.common.types.TypeProtos.MinorType.TIME_VALUE:
+              case 
org.apache.drill.common.types.TypeProtos.MinorType.TIMETZ_VALUE:
+                hll.offer(in.readLocalDateTime());
+                break;
+              case 
org.apache.drill.common.types.TypeProtos.MinorType.VARBINARY_VALUE:
+                hll.offer(in.readByteArray());
+                break;
+              default:
+                work.obj = null;
+            }
+            break;
+          default:
+            work.obj = null;
+        }
+      }
+    }
+
+    @Override
+    public void output() {
+      if (work.obj != null) {
+        com.clearspring.analytics.stream.cardinality.HyperLogLog hll =
+            (com.clearspring.analytics.stream.cardinality.HyperLogLog) 
work.obj;
+
+        out.isSet = 1;
+        out.value = hll.cardinality();
+      } else {
+        out.isSet = 0;
+      }
+    }
+
+    @Override
+    public void reset() {
+      work.obj = new 
com.clearspring.analytics.stream.cardinality.HyperLogLog(10);
+    }
+  }
+
+
+  @FunctionTemplate(name = "hll_decode", scope = FunctionScope.SIMPLE, nulls = 
NullHandling.NULL_IF_NULL)
+  public static class HllDecode implements DrillSimpleFunc {
+
+    @Param
+    NullableVarBinaryHolder in;
+    @Output
+    BigIntHolder out;
+
+    @Override
+    public void setup() {
+    }
+
+    public void eval() {
+      out.value = -1;
+
+      if (in.isSet != 0) {
+        byte[] din = new byte[in.end - in.start];
+        in.buffer.getBytes(in.start, din);
+        try {
+          out.value = 
com.clearspring.analytics.stream.cardinality.HyperLogLog.Builder.build(din).cardinality();
+        } catch (java.io.IOException e) {
+          throw new 
org.apache.drill.common.exceptions.DrillRuntimeException("Failure evaluation 
hll_decode", e);
+        }
+      }
+    }
+  }
+
+}
\ No newline at end of file
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index c3e6bda..d770636 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -288,6 +288,10 @@ public class QueryContext implements AutoCloseable, 
OptimizerRulesContext, Schem
     return new PartitionExplorerImpl(getRootSchema());
   }
 
+  public DrillbitContext getDrillbitContext() {
+    return drillbitContext;
+  }
+
   @Override
   public ValueHolder getConstantValueHolder(String value, MinorType type, 
Function<DrillBuf, ValueHolder> holderInitializer) {
     if (!constantValueHolderCache.containsKey(value)) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 262e7e5..85d2a2b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -35,11 +35,13 @@ import org.apache.drill.exec.physical.config.RowKeyJoinPOP;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.StatisticsAggregate;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.config.Trace;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.physical.config.UnnestPOP;
 import org.apache.drill.exec.physical.config.UnorderedReceiver;
+import org.apache.drill.exec.physical.config.UnpivotMaps;
 import org.apache.drill.exec.physical.config.Values;
 import org.apache.drill.exec.physical.config.WindowPOP;
 
@@ -97,6 +99,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E 
extends Throwable> impleme
   }
 
   @Override
+  public T visitStatisticsAggregate(StatisticsAggregate agg, X value) throws E 
{
+    return visitOp(agg, value);
+  }
+
+  @Override
   public T visitHashAggregate(HashAggregate agg, X value) throws E {
     return visitOp(agg, value);
   }
@@ -215,6 +222,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E 
extends Throwable> impleme
   }
 
   @Override
+  public T visitUnpivot(UnpivotMaps op, X value) throws E {
+    return visitOp(op, value);
+  }
+
+  @Override
   public T visitOp(PhysicalOperator op, X value) throws E{
     throw new UnsupportedOperationException(String.format(
         "The PhysicalVisitor of type %s does not currently support visiting 
the PhysicalOperator type %s.", this
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 77a87c2..a21f578 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -35,11 +35,13 @@ import org.apache.drill.exec.physical.config.RowKeyJoinPOP;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.StatisticsAggregate;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.config.Trace;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.physical.config.UnnestPOP;
 import org.apache.drill.exec.physical.config.UnorderedReceiver;
+import org.apache.drill.exec.physical.config.UnpivotMaps;
 import org.apache.drill.exec.physical.config.Values;
 import org.apache.drill.exec.physical.config.WindowPOP;
 
@@ -68,8 +70,10 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP 
extends Throwable> {
   public RETURN visitRowKeyJoin(RowKeyJoinPOP join, EXTRA value) throws EXCEP;
   public RETURN visitReceiver(Receiver receiver, EXTRA value) throws EXCEP;
   public RETURN visitStreamingAggregate(StreamingAggregate agg, EXTRA value) 
throws EXCEP;
+  public RETURN visitStatisticsAggregate(StatisticsAggregate agg, EXTRA value) 
throws EXCEP;
   public RETURN visitHashAggregate(HashAggregate agg, EXTRA value) throws 
EXCEP;
   public RETURN visitWriter(Writer op, EXTRA value) throws EXCEP;
+  public RETURN visitUnpivot(UnpivotMaps op, EXTRA value) throws EXCEP;
   public RETURN visitValues(Values op, EXTRA value) throws EXCEP;
   public RETURN visitOp(PhysicalOperator op, EXTRA value) throws EXCEP;
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StatisticsAggregate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StatisticsAggregate.java
new file mode 100644
index 0000000..95ee6bf
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StatisticsAggregate.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.List;
+
+@JsonTypeName("statistics-aggregate")
+public class StatisticsAggregate extends StreamingAggregate {
+  // private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StatisticsAggregate.class);
+
+  private final List<String> functions;
+
+  @JsonCreator
+  public StatisticsAggregate(
+      @JsonProperty("child") PhysicalOperator child,
+      @JsonProperty("functions") List<String> functions) {
+    super(child, null, null, 0.f);
+    this.functions = ImmutableList.copyOf(functions);
+  }
+
+  public List<String> getFunctions() {
+    return functions;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value)
+      throws E {
+    return physicalVisitor.visitStatisticsAggregate(this, value);
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new StatisticsAggregate(child, functions);
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.STATISTICS_AGGREGATE_VALUE;
+  }
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnpivotMaps.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnpivotMaps.java
new file mode 100644
index 0000000..ac71b11
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnpivotMaps.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("unpivot-maps")
+public class UnpivotMaps extends AbstractSingle {
+  private final List<String> mapFieldsNames;
+
+  @JsonCreator
+  public UnpivotMaps(@JsonProperty("child") PhysicalOperator child, 
List<String> mapFieldsNames) {
+    super(child);
+    this.mapFieldsNames = mapFieldsNames;
+  }
+
+  public List<String> getMapFieldNames() {
+    return mapFieldsNames;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
+    return physicalVisitor.visitUnpivot(this, value);
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new UnpivotMaps(child, mapFieldsNames);
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.UNPIVOT_MAPS_VALUE;
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
index 1d1d3cb..396fd36 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
 
 import java.util.Iterator;
 
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
@@ -31,6 +32,7 @@ public class InternalBatch implements 
Iterable<VectorWrapper<?>>{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(InternalBatch.class);
 
   private final VectorContainer container;
+  private final RecordBatch incoming;
   private final BatchSchema schema;
   private final SelectionVector2 sv2;
   private final SelectionVector4 sv4;
@@ -54,6 +56,7 @@ public class InternalBatch implements 
Iterable<VectorWrapper<?>>{
       this.sv2 = null;
     }
     this.schema = incoming.getSchema();
+    this.incoming = incoming;
     this.container = VectorContainer.getTransferClone(incoming, 
ignoreWrappers, oContext);
   }
 
@@ -88,4 +91,7 @@ public class InternalBatch implements 
Iterable<VectorWrapper<?>>{
     return container.getValueAccessorById(clazz, fieldIds);
   }
 
+  public FragmentContext getContext() {
+    return incoming.getContext();
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StatisticsAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StatisticsAggBatch.java
new file mode 100644
index 0000000..ea5a7b3
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StatisticsAggBatch.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.aggregate;
+
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import com.sun.codemodel.JExpr;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.FunctionCallFactory;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.StatisticsAggregate;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.FieldIdUtil;
+import org.apache.drill.exec.vector.complex.MapVector;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * TODO: This needs cleanup. Currently the key values are constants and we 
compare the constants for
+ * every record. Seems unnecessary.
+ *
+ * Example input and output:
+ * Schema of incoming batch: region_id (VARCHAR), sales_city (VARCHAR), cnt 
(BIGINT)
+ * Schema of output:
+ *    "schema" : BIGINT - Schema number. For each schema change this number is 
incremented.
+ *    "computed" : BIGINT - What time is it computed?
+ *    "columns" : MAP - Column names
+ *       "region_id" : VARCHAR
+ *       "sales_city" : VARCHAR
+ *       "cnt" : VARCHAR
+ *    "statscount" : MAP
+ *       "region_id" : BIGINT - statscount(region_id) - aggregation over all 
values of region_id in incoming batch
+ *       "sales_city" : BIGINT - statscount(sales_city)
+ *       "cnt" : BIGINT - statscount(cnt)
+ *    "nonnullstatcount" : MAP
+ *       "region_id" : BIGINT - nonnullstatcount(region_id)
+ *       "sales_city" : BIGINT - nonnullstatcount(sales_city)
+ *       "cnt" : BIGINT - nonnullstatcount(cnt)
+ *   .... another map for next stats function ....
+ */
+public class StatisticsAggBatch extends StreamingAggBatch {
+  private List<String> functions;
+  private int schema = 0;
+
+  public StatisticsAggBatch(StatisticsAggregate popConfig, RecordBatch 
incoming, FragmentContext context)
+      throws OutOfMemoryException {
+    super(popConfig, incoming, context);
+    this.functions = popConfig.getFunctions();
+  }
+
+  private void createKeyColumn(String name, LogicalExpression expr, 
List<LogicalExpression> keyExprs, List<TypedFieldId> keyOutputIds)
+      throws SchemaChangeException {
+    ErrorCollector collector = new ErrorCollectorImpl();
+
+    LogicalExpression mle = ExpressionTreeMaterializer.materialize(expr, 
incoming, collector, context.getFunctionRegistry());
+
+    MaterializedField outputField = MaterializedField.create(name, 
mle.getMajorType());
+    ValueVector vector = TypeHelper.getNewVector(outputField, 
oContext.getAllocator());
+
+    keyExprs.add(mle);
+    keyOutputIds.add(container.add(vector));
+
+    if (collector.hasErrors()) {
+      throw new SchemaChangeException("Failure while materializing expression. 
" + collector.toErrorString());
+    }
+  }
+
+  private void createNestedKeyColumn(MapVector parent, String name, 
LogicalExpression expr, List<LogicalExpression> keyExprs, List<TypedFieldId> 
keyOutputIds)
+      throws SchemaChangeException {
+    ErrorCollector collector = new ErrorCollectorImpl();
+
+    LogicalExpression mle = ExpressionTreeMaterializer.materialize(expr, 
incoming, collector, context.getFunctionRegistry());
+
+    Class<? extends ValueVector> vvc =
+        TypeHelper.getValueVectorClass(mle.getMajorType().getMinorType(), 
mle.getMajorType().getMode());
+
+    ValueVector vv = parent.addOrGet(name, mle.getMajorType(), vvc);
+
+    TypedFieldId pfid = 
container.getValueVectorId(SchemaPath.getSimplePath(parent.getField().getName()));
+    assert pfid.getFieldIds().length == 1;
+    TypedFieldId.Builder builder = TypedFieldId.newBuilder();
+    builder.addId(pfid.getFieldIds()[0]);
+    TypedFieldId id =
+        FieldIdUtil.getFieldIdIfMatches(parent, builder, true,
+            
SchemaPath.getSimplePath(vv.getField().getName()).getRootSegment());
+
+    keyExprs.add(mle);
+    keyOutputIds.add(id);
+
+    if (collector.hasErrors()) {
+      throw new SchemaChangeException("Failure while materializing expression. 
" + collector.toErrorString());
+    }
+  }
+
+  private void addMapVector(String name, MapVector parent, LogicalExpression 
expr, List<LogicalExpression> valueExprs)
+      throws SchemaChangeException {
+    ErrorCollector collector = new ErrorCollectorImpl();
+
+    LogicalExpression mle = ExpressionTreeMaterializer.materialize(expr, 
incoming, collector, context.getFunctionRegistry());
+
+    Class<? extends ValueVector> vvc =
+        TypeHelper.getValueVectorClass(mle.getMajorType().getMinorType(), 
mle.getMajorType().getMode());
+    ValueVector vv = parent.addOrGet(name, mle.getMajorType(), vvc);
+
+    TypedFieldId pfid = 
container.getValueVectorId(SchemaPath.getSimplePath(parent.getField().getName()));
+    assert pfid.getFieldIds().length == 1;
+    TypedFieldId.Builder builder = TypedFieldId.newBuilder();
+    builder.addId(pfid.getFieldIds()[0]);
+    TypedFieldId id = FieldIdUtil.getFieldIdIfMatches(parent, builder, true,
+        SchemaPath.getSimplePath(vv.getField().getName()).getRootSegment());
+
+    valueExprs.add(new ValueVectorWriteExpression(id, mle, true));
+
+    if (collector.hasErrors()) {
+      throw new SchemaChangeException("Failure while materializing expression. 
" + collector.toErrorString());
+    }
+  }
+
+  private StreamingAggregator codegenAggregator(List<LogicalExpression> 
keyExprs, List<LogicalExpression> valueExprs, List<TypedFieldId> keyOutputIds)
+      throws SchemaChangeException, ClassTransformationException, IOException {
+    ClassGenerator<StreamingAggregator> cg = 
CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, 
context.getOptions());
+
+    LogicalExpression[] keyExprsArray = new LogicalExpression[keyExprs.size()];
+    LogicalExpression[] valueExprsArray = new 
LogicalExpression[valueExprs.size()];
+    TypedFieldId[] keyOutputIdsArray = new TypedFieldId[keyOutputIds.size()];
+
+    keyExprs.toArray(keyExprsArray);
+    valueExprs.toArray(valueExprsArray);
+    keyOutputIds.toArray(keyOutputIdsArray);
+
+    setupIsSame(cg, keyExprsArray);
+    setupIsSameApart(cg, keyExprsArray);
+    addRecordValues(cg, valueExprsArray);
+    outputRecordKeys(cg, keyOutputIdsArray, keyExprsArray);
+    outputRecordKeysPrev(cg, keyOutputIdsArray, keyExprsArray);
+
+    cg.getBlock("resetValues")._return(JExpr.TRUE);
+    getIndex(cg);
+
+    container.buildSchema(SelectionVectorMode.NONE);
+    StreamingAggregator agg = context.getImplementationClass(cg);
+    agg.setup(oContext, incoming, this, ValueVector.MAX_ROW_COUNT);
+    return agg;
+  }
+
+  protected StreamingAggregator createAggregatorInternal()
+      throws SchemaChangeException, ClassTransformationException, IOException {
+    container.clear();
+
+    List<LogicalExpression> keyExprs = Lists.newArrayList();
+    List<LogicalExpression> valueExprs = Lists.newArrayList();
+    List<TypedFieldId> keyOutputIds = Lists.newArrayList();
+
+    createKeyColumn("schema",
+        ValueExpressions.getBigInt(schema++),
+        keyExprs,
+        keyOutputIds
+    );
+    createKeyColumn("computed",
+        ValueExpressions.getBigInt(System.currentTimeMillis()),
+        keyExprs,
+        keyOutputIds
+    );
+
+    MapVector cparent = new MapVector("column", oContext.getAllocator(), null);
+    container.add(cparent);
+    for (MaterializedField mf : incoming.getSchema()) {
+      createNestedKeyColumn(
+          cparent,
+          mf.getName(),
+          ValueExpressions.getChar(mf.getName(), 0),
+          keyExprs,
+          keyOutputIds
+      );
+    }
+
+    for (String func : functions) {
+      MapVector parent = new MapVector(func, oContext.getAllocator(), null);
+      container.add(parent);
+
+      for (MaterializedField mf : incoming.getSchema()) {
+        List<LogicalExpression> args = Lists.newArrayList();
+        args.add(SchemaPath.getSimplePath(mf.getName()));
+        LogicalExpression call = FunctionCallFactory.createExpression(func, 
args);
+
+        addMapVector(mf.getName(), parent, call, valueExprs);
+      }
+    }
+
+    return codegenAggregator(keyExprs, valueExprs, keyOutputIds);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StatisticsAggBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StatisticsAggBatchCreator.java
new file mode 100644
index 0000000..aba325c
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StatisticsAggBatchCreator.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.aggregate;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.config.StatisticsAggregate;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+
+@SuppressWarnings("unused")
+public class StatisticsAggBatchCreator implements 
BatchCreator<StatisticsAggregate>{
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, 
StatisticsAggregate config, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() == 1);
+    return new StatisticsAggBatch(config, children.iterator().next(), context);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index ffcfa78..e1e43bd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -77,8 +77,8 @@ import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> 
{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class);
 
-  private StreamingAggregator aggregator;
-  private final RecordBatch incoming;
+  protected StreamingAggregator aggregator;
+  protected final RecordBatch incoming;
   private List<BaseWriter.ComplexWriter> complexWriters;
   //
   // Streaming agg can be in (a) a normal pipeline or (b) it may be in a 
pipeline that is part of a subquery involving
@@ -533,7 +533,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
   private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, 
IS_SAME, IS_SAME);
   private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, 
IS_SAME, IS_SAME);
 
-  private void setupIsSame(ClassGenerator<StreamingAggregator> cg, 
LogicalExpression[] keyExprs) {
+  protected void setupIsSame(ClassGenerator<StreamingAggregator> cg, 
LogicalExpression[] keyExprs) {
     cg.setMappingSet(IS_SAME_I1);
     for (final LogicalExpression expr : keyExprs) {
       // first, we rewrite the evaluation stack for each side of the 
comparison.
@@ -556,7 +556,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
   private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", 
null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ);
   private final MappingSet ISA_B2 = new MappingSet("b2Index", null, 
"incoming", null, IS_SAME_PREV, IS_SAME_PREV);
 
-  private void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, 
LogicalExpression[] keyExprs) {
+  protected void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, 
LogicalExpression[] keyExprs) {
     cg.setMappingSet(ISA_B1);
     for (final LogicalExpression expr : keyExprs) {
       // first, we rewrite the evaluation stack for each side of the 
comparison.
@@ -578,7 +578,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
   private final GeneratorMapping EVAL_OUTSIDE = 
GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", 
"cleanup");
   private final MappingSet EVAL = new MappingSet("index", "outIndex", 
"incoming", "outgoing", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
 
-  private void addRecordValues(ClassGenerator<StreamingAggregator> cg, 
LogicalExpression[] valueExprs) {
+  protected void addRecordValues(ClassGenerator<StreamingAggregator> cg, 
LogicalExpression[] valueExprs) {
     cg.setMappingSet(EVAL);
     for (final LogicalExpression ex : valueExprs) {
       cg.addExpr(ex);
@@ -587,7 +587,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
 
   private final MappingSet RECORD_KEYS = new 
MappingSet(GeneratorMapping.create("setupInterior", "outputRecordKeys", null, 
null));
 
-  private void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, 
TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) {
+  protected void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, 
TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) {
     cg.setMappingSet(RECORD_KEYS);
     for (int i = 0; i < keyExprs.length; i++) {
       cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], keyExprs[i], 
true));
@@ -600,7 +600,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
   private final GeneratorMapping PREVIOUS_KEYS = 
GeneratorMapping.create("outputRecordKeysPrev", "outputRecordKeysPrev", null, 
null);
   private final MappingSet RECORD_KEYS_PREV = new MappingSet("previousIndex", 
"outIndex", "previous", null, PREVIOUS_KEYS, PREVIOUS_KEYS);
 
-  private void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, 
TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) {
+  protected void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, 
TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) {
     cg.setMappingSet(RECORD_KEYS_PREV);
 
     for (int i = 0; i < keyExprs.length; i++) {
@@ -614,7 +614,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
     }
   }
 
-  private void getIndex(ClassGenerator<StreamingAggregator> g) {
+  protected void getIndex(ClassGenerator<StreamingAggregator> g) {
     switch (incoming.getSchema().getSelectionVectorMode()) {
     case FOUR_BYTE: {
       JVar var = g.declareClassField("sv4_", 
g.getModel()._ref(SelectionVector4.class));
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsBatchCreator.java
new file mode 100644
index 0000000..733524f
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsBatchCreator.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unpivot;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.config.UnpivotMaps;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+
+@SuppressWarnings("unused")
+public class UnpivotMapsBatchCreator implements BatchCreator<UnpivotMaps>{
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, 
UnpivotMaps config, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() == 1);
+    return new UnpivotMapsRecordBatch(config, children.iterator().next(), 
context);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
new file mode 100644
index 0000000..e98d70e
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unpivot;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.UnpivotMaps;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+
+/**
+ * TODO: This needs cleanup, especially in state transitions.
+ *
+ * Unpivot maps. Assumptions are:
+ *  1) all child vectors in a map are of same type.
+ *  2) Each map contains the same number of fields and field names are also 
same (types could be different).
+ *
+ * Example input and output:
+ * Schema of input:
+ *    "schema" : BIGINT - Schema number. For each schema change this number is 
incremented.
+ *    "computed" : BIGINT - What time is it computed?
+ *    "columns" : MAP - Column names
+ *       "region_id" : VARCHAR
+ *       "sales_city" : VARCHAR
+ *       "cnt" : VARCHAR
+ *    "statscount" : MAP
+ *       "region_id" : BIGINT - statscount(region_id) - aggregation over all 
values of region_id in incoming batch
+ *       "sales_city" : BIGINT - statscount(sales_city)
+ *       "cnt" : BIGINT - statscount(cnt)
+ *    "nonnullstatcount" : MAP
+ *       "region_id" : BIGINT - nonnullstatcount(region_id)
+ *       "sales_city" : BIGINT - nonnullstatcount(sales_city)
+ *       "cnt" : BIGINT - nonnullstatcount(cnt)
+ *   .... another map for next stats function ....
+ *
+ * Schema of output:
+ *  "schema" : BIGINT - Schema number. For each schema change this number is 
incremented.
+ *  "computed" : BIGINT - What time is this computed?
+ *  "column" : column name
+ *  "statscount" : BIGINT
+ *  "nonnullstatcount" : BIGINT
+ *  .... one column for each map type ...
+ */
+public class UnpivotMapsRecordBatch extends 
AbstractSingleRecordBatch<UnpivotMaps> {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnpivotMapsRecordBatch.class);
+
+  private final List<String> mapFieldsNames;
+
+  private int keyIndex = 0;
+  private List<String> keyList = null;
+
+  private Map<MaterializedField, Map<String, ValueVector>> dataSrcVecMap = 
null;
+
+  // Map of non-map fields to VV in the incoming schema
+  private Map<MaterializedField, ValueVector> copySrcVecMap = null;
+
+  private List<TransferPair> transferList;
+  private int recordCount = 0;
+
+  public UnpivotMapsRecordBatch(UnpivotMaps pop, RecordBatch incoming, 
FragmentContext context)
+      throws OutOfMemoryException {
+    super(pop, context, incoming);
+    this.mapFieldsNames = pop.getMapFieldNames();
+  }
+
+  @Override
+  public int getRecordCount() {
+    return recordCount;
+  }
+
+  @Override
+  public IterOutcome innerNext() {
+    if (keyIndex != 0) {
+      doWork();
+      return IterOutcome.OK;
+    } else {
+      return super.innerNext();
+    }
+  }
+
+  public VectorContainer getOutgoingContainer() {
+    return this.container;
+  }
+
+  private void doTransfer() {
+    final int inputCount = incoming.getRecordCount();
+
+    for (TransferPair tp : transferList) {
+      tp.splitAndTransfer(0, inputCount);
+    }
+  }
+
+  @Override
+  protected IterOutcome doWork() {
+    int outRecordCount = incoming.getRecordCount();
+
+    prepareTransfers();
+    doTransfer();
+
+    keyIndex = (keyIndex + 1) % keyList.size();
+    recordCount = outRecordCount;
+
+    if (keyIndex == 0) {
+      for (VectorWrapper w : incoming) {
+        w.clear();
+      }
+    }
+    return IterOutcome.OK;
+  }
+
+  /**
+   * Identify the list of fields within a map which are unpivoted as columns 
in output
+   */
+  private void buildKeyList() {
+    List<String> lastMapKeyList = null;
+    for (VectorWrapper<?> vw : incoming) {
+      if (vw.getField().getType().getMinorType() != MinorType.MAP) {
+        continue;
+      }
+
+      keyList = Lists.newArrayList();
+
+      for (ValueVector vv : vw.getValueVector()) {
+        keyList.add(vv.getField().getName());
+      }
+
+      if (lastMapKeyList == null) {
+        lastMapKeyList = keyList;
+      } else {
+        if (keyList.size() != lastMapKeyList.size() || 
!lastMapKeyList.containsAll(keyList)) {
+          throw new UnsupportedOperationException("Maps have different 
fields");
+        }
+      }
+    }
+  }
+
+  private void buildOutputContainer() {
+    dataSrcVecMap = Maps.newHashMap();
+    copySrcVecMap = Maps.newHashMap();
+    for (VectorWrapper<?> vw : incoming) {
+      MaterializedField ds = vw.getField();
+      String col = vw.getField().getName();
+
+      if (!mapFieldsNames.contains(col)) {
+        MajorType mt = vw.getValueVector().getField().getType();
+        MaterializedField mf = MaterializedField.create(col, mt);
+        container.add(TypeHelper.getNewVector(mf, oContext.getAllocator()));
+        copySrcVecMap.put(mf, vw.getValueVector());
+        continue;
+      }
+
+      MapVector mapVector = (MapVector) vw.getValueVector();
+      assert mapVector.getPrimitiveVectors().size() > 0;
+
+      MajorType mt = mapVector.iterator().next().getField().getType();
+      MaterializedField mf = MaterializedField.create(col, mt);
+      assert !dataSrcVecMap.containsKey(mf);
+      container.add(TypeHelper.getNewVector(mf, oContext.getAllocator()));
+
+      Map<String, ValueVector> m = Maps.newHashMap();
+      dataSrcVecMap.put(mf, m);
+
+      for (ValueVector vv : mapVector) {
+        String fieldName = vv.getField().getName();
+
+        if (!keyList.contains(fieldName)) {
+          throw new UnsupportedOperationException("Unpivot data vector " +
+              ds + " contains key " + fieldName + " not contained in key 
source!");
+        }
+
+        if (vv.getField().getType().getMinorType() == MinorType.MAP) {
+          throw new UnsupportedOperationException("Unpivot of nested map is 
not supported!");
+        }
+
+        m.put(fieldName, vv);
+      }
+    }
+
+    container.buildSchema(incoming.getSchema().getSelectionVectorMode());
+  }
+
+  private void prepareTransfers() {
+    transferList = Lists.newArrayList();
+    for (VectorWrapper<?> vw : container) {
+      MaterializedField mf = vw.getField();
+
+      ValueVector vv;
+      TransferPair tp;
+      if (dataSrcVecMap.containsKey(mf)) {
+        String k = keyList.get(keyIndex);
+        vv = dataSrcVecMap.get(mf).get(k);
+        tp = vv.makeTransferPair(vw.getValueVector());
+      } else {
+        vv = copySrcVecMap.get(mf);
+        tp = vv.makeTransferPair(vw.getValueVector());
+      }
+
+      transferList.add(tp);
+    }
+  }
+
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
+    container.clear();
+
+    buildKeyList();
+    buildOutputContainer();
+    return true;
+  }
+
+  @Override
+  public void dump() {
+    logger.error("UnpivotMapsRecordbatch[recordCount={}, container={}]", 
recordCount, container);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 6cdb18d..b6b0f58 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -60,6 +60,7 @@ import org.apache.drill.exec.planner.logical.DrillValuesRule;
 import org.apache.drill.exec.planner.logical.DrillWindowRule;
 import org.apache.drill.exec.planner.logical.partition.ParquetPruneScanRule;
 import org.apache.drill.exec.planner.logical.partition.PruneScanRule;
+import org.apache.drill.exec.planner.physical.AnalyzePrule;
 import org.apache.drill.exec.planner.physical.ConvertCountToDirectScan;
 import org.apache.drill.exec.planner.physical.LateralJoinPrule;
 import org.apache.drill.exec.planner.physical.DirectScanPrule;
@@ -516,6 +517,7 @@ public enum PlannerPhase {
     ruleList.add(ValuesPrule.INSTANCE);
     ruleList.add(DirectScanPrule.INSTANCE);
     ruleList.add(RowKeyJoinPrule.INSTANCE);
+    ruleList.add(AnalyzePrule.INSTANCE);
 
     ruleList.add(UnnestPrule.INSTANCE);
     ruleList.add(LateralJoinPrule.INSTANCE);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
index cde49e4..42fbedb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
@@ -20,27 +20,28 @@ package org.apache.drill.exec.planner.common;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.holders.IntHolder;
-import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.physical.impl.join.JoinUtils;
 import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.planner.logical.DrillJoin;
 import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
-
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 /**
@@ -100,9 +101,31 @@ public abstract class DrillJoinRelBase extends Join 
implements DrillJoin {
   public double estimateRowCount(RelMetadataQuery mq) {
     if (this.condition.isAlwaysTrue()) {
       return joinRowFactor * this.getLeft().estimateRowCount(mq) * 
this.getRight().estimateRowCount(mq);
-    } else {
-      return joinRowFactor * Math.max(this.getLeft().estimateRowCount(mq), 
this.getRight().estimateRowCount(mq));
     }
+
+    int[] joinFields = new int[2];
+
+    LogicalJoin jr = LogicalJoin.create(this.getLeft(), this.getRight(), 
this.getCondition(),
+            this.getVariablesSet(), this.getJoinType());
+
+    if (RelOptUtil.analyzeSimpleEquiJoin(jr, joinFields)) {
+      ImmutableBitSet leq = ImmutableBitSet.of(joinFields[0]);
+      ImmutableBitSet req = ImmutableBitSet.of(joinFields[1]);
+
+      Double ldrc = mq.getDistinctRowCount(this.getLeft(), leq, null);
+      Double rdrc = mq.getDistinctRowCount(this.getRight(), req, null);
+
+      Double lrc = mq.getRowCount(this.getLeft());
+      Double rrc = mq.getRowCount(this.getRight());
+
+      if (ldrc != null && rdrc != null && lrc != null && rrc != null) {
+        return (lrc * rrc) / Math.max(ldrc, rdrc);
+      }
+    }
+
+    return joinRowFactor * Math.max(
+        mq.getRowCount(left),
+        mq.getRowCount(right));
   }
 
   /**
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java
new file mode 100644
index 0000000..a22552b
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.common;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.server.DrillbitContext;
+
+/**
+ * Wraps the stats table info including schema and tableName. Also 
materializes stats from storage and keeps them in
+ * memory.
+ */
+public class DrillStatsTable {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DrillStatsTable.class);
+
+  /**
+   * List of columns in stats table.
+   */
+  public static final String COL_COLUMN = "column";
+  public static final String COL_COMPUTED = "computed";
+  public static final String COL_STATCOUNT = "statcount";
+  public static final String COL_NDV = "ndv";
+
+  private final String schemaName;
+  private final String tableName;
+
+  private final Map<String, Long> ndv = Maps.newHashMap();
+  private double rowCount = -1;
+
+  private boolean materialized = false;
+
+  public DrillStatsTable(String schemaName, String tableName) {
+    this.schemaName = schemaName;
+    this.tableName = tableName;
+  }
+
+  /**
+   * Get number of distinct values of given column. If stats are not present 
for the given column, a null is returned.
+   *
+   * Note: returned data may not be accurate. Accuracy depends on whether the 
table data has changed after the
+   * stats are computed.
+   *
+   * @param col
+   * @return
+   */
+  public Double getNdv(String col) {
+    Preconditions.checkState(materialized, "Stats are not yet materialized.");
+
+    final String upperCol = col.toUpperCase();
+    final Long ndvCol = ndv.get(upperCol);
+    if (ndvCol != null) {
+      return Math.min(ndvCol, rowCount);
+    }
+
+    return null;
+  }
+
+  /**
+   * Get row count of the table. Returns null if stats are not present.
+   *
+   * Note: returned data may not be accurate. Accuracy depends on whether the 
table data has changed after the
+   * stats are computed.
+   *
+   * @return
+   */
+  public Double getRowCount() {
+    Preconditions.checkState(materialized, "Stats are not yet materialized.");
+    return rowCount > 0 ? rowCount : null;
+  }
+
+  /**
+   * Read the stats from storage and keep them in memory.
+   * @param context
+   * @throws Exception
+   */
+  public void materialize(final QueryContext context) throws Exception {
+    if (materialized) {
+      return;
+    }
+
+    final String fullTableName = "`" + schemaName + "`.`" + tableName + "`";
+    final String sql = "SELECT a.* FROM " + fullTableName + " AS a INNER JOIN 
" +
+        "(SELECT `" + COL_COLUMN + "`, max(`" + COL_COMPUTED +"`) AS `" + 
COL_COMPUTED + "` " +
+        "FROM " + fullTableName + " GROUP BY `" + COL_COLUMN + "`) AS b " +
+        "ON a.`" + COL_COLUMN + "` = b.`" + COL_COLUMN +"` and a.`" + 
COL_COMPUTED + "` = b.`" + COL_COMPUTED + "`";
+
+    final DrillbitContext dc = context.getDrillbitContext();
+    try(final DrillClient client = new DrillClient(dc.getConfig(), 
dc.getClusterCoordinator(), dc.getAllocator())) {
+      /*final Listener listener = new Listener(dc.getAllocator());
+
+      client.connect();
+      client.runQuery(UserBitShared.QueryType.SQL, sql, listener);
+
+      listener.waitForCompletion();
+
+      for (Map<String, String> r : listener.results) {
+        ndv.put(r.get(COL_COLUMN).toUpperCase(), Long.valueOf(r.get(COL_NDV)));
+        rowCount = Math.max(rowCount, Long.valueOf(r.get(COL_STATCOUNT)));
+      }*/
+    }
+
+    materialized = true;
+  }
+
+  /**
+   * materialize on nodes that have an attached stats table
+   */
+  public static class StatsMaterializationVisitor extends RelVisitor {
+    private QueryContext context;
+
+    public static void materialize(final RelNode relNode, final QueryContext 
context) {
+      new StatsMaterializationVisitor(context).go(relNode);
+    }
+
+    private StatsMaterializationVisitor(QueryContext context) {
+      this.context = context;
+    }
+
+    @Override
+    public void visit(RelNode node, int ordinal, RelNode parent) {
+      if (node instanceof TableScan) {
+        try {
+          final DrillTable drillTable = 
node.getTable().unwrap(DrillTable.class);
+          final DrillStatsTable statsTable = drillTable.getStatsTable();
+          if (statsTable != null) {
+            statsTable.materialize(context);
+          }
+        } catch (Exception e) {
+          // Log a warning and proceed. We don't want to fail a query.
+          logger.warn("Failed to materialize the stats. Continuing without 
stats.", e);
+        }
+      }
+      super.visit(node, ordinal, parent);
+    }
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java
index 373683f..13f1600 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java
@@ -17,17 +17,22 @@
  */
 package org.apache.drill.exec.planner.cost;
 
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelColumnOrigin;
 import org.apache.calcite.rel.metadata.RelMdDistinctRowCount;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
 
+import java.util.List;
+
 public class DrillRelMdDistinctRowCount extends RelMdDistinctRowCount {
   private static final DrillRelMdDistinctRowCount INSTANCE =
       new DrillRelMdDistinctRowCount();
@@ -61,4 +66,51 @@ public class DrillRelMdDistinctRowCount extends 
RelMdDistinctRowCount {
     // Consistent with the estimation of Aggregate row count in RelMdRowCount 
: distinctRowCount = rowCount * 10%.
     return scan.estimateRowCount(mq) * 0.1;
   }
+
+  public Double getDistinctRowCount(RelNode rel, RelMetadataQuery mq, 
ImmutableBitSet groupKey, RexNode predicate) {
+    if (rel instanceof DrillScanRel) {
+      return getDistinctRowCount((DrillScanRel) rel, mq, groupKey);
+    } else {
+      return super.getDistinctRowCount(rel, mq, groupKey, predicate);
+    }
+  }
+
+  /**
+   * Estimates the number of rows which would be produced by a GROUP BY on the
+   * set of columns indicated by groupKey.
+   * column").
+   */
+  private Double getDistinctRowCount(DrillScanRel scan, RelMetadataQuery mq, 
ImmutableBitSet groupKey) {
+    if (scan.getDrillTable() == null || scan.getDrillTable().getStatsTable() 
== null) {
+      // If there is no table or metadata (stats) table associated with scan, 
estimate the distinct row count.
+      // Consistent with the estimation of Aggregate row count in 
RelMdRowCount : distinctRowCount = rowCount * 10%.
+      return scan.getRows() * 0.1;
+    }
+
+    // TODO: may be we should get the column origin of each group by key 
before we look up it in metadata table?
+    List<RelColumnOrigin> cols = Lists.newArrayList();
+
+    if (groupKey.length() == 0) {
+      return new Double(0);
+    }
+
+    DrillStatsTable md = scan.getDrillTable().getStatsTable();
+
+    final double rc = mq.getRowCount(scan);
+    double s = 1.0;
+    for (int i = 0; i < groupKey.length(); i++) {
+      final String colName = scan.getRowType().getFieldNames().get(i);
+      if (!groupKey.get(i) && colName.equals("*")) {
+        continue;
+      }
+
+      Double d = md.getNdv(colName);
+      if (d == null) {
+        continue;
+      }
+
+      s *= 1 - d / rc;
+    }
+    return new Double((1 - s) * rc);
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java
index 7f15fb3..343affb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.planner.cost;
 
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Filter;
@@ -31,6 +32,8 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
 
 public class DrillRelMdRowCount extends RelMdRowCount {
   private static final DrillRelMdRowCount INSTANCE = new DrillRelMdRowCount();
@@ -81,4 +84,20 @@ public class DrillRelMdRowCount extends RelMdRowCount {
   public Double getRowCount(Join rel, RelMetadataQuery mq) {
     return rel.estimateRowCount(mq);
   }
+
+  public Double getRowCount(RelNode rel, RelMetadataQuery mq) {
+    if (rel instanceof DrillScanRel) {
+      return getRowCount((DrillScanRel)rel, mq);
+    }
+    return super.getRowCount(rel, mq);
+  }
+
+  private Double getRowCount(DrillScanRel scanRel, RelMetadataQuery mq) {
+    final DrillStatsTable md = scanRel.getDrillTable().getStatsTable();
+    if (md != null) {
+      return md.getRowCount();
+    }
+
+    return super.getRowCount(scanRel, mq);
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAnalyzeRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAnalyzeRel.java
new file mode 100644
index 0000000..5d570f3
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAnalyzeRel.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.drill.common.logical.data.Analyze;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+
+import java.util.List;
+
+/**
+ * Drill logical node for "Analyze".
+ */
+public class DrillAnalyzeRel extends SingleRel implements DrillRel {
+
+  public DrillAnalyzeRel(RelOptCluster cluster, RelTraitSet traits, RelNode 
child) {
+    super(cluster, traits, child);
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
+    final double dRows = mq.getRowCount(getInput());
+    final double dCpu = dRows * DrillCostBase.COMPARE_CPU_COST;
+    final double dIo = 0;
+    return planner.getCostFactory().makeCost(dRows, dCpu, dIo);
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new DrillAnalyzeRel(getCluster(), traitSet, sole(inputs));
+  }
+
+  @Override
+  public LogicalOperator implement(DrillImplementor implementor) {
+    final LogicalOperator inputOp = implementor.visitChild(this, 0, 
getInput());
+    final Analyze rel = new Analyze();
+    rel.setInput(inputOp);
+
+    return rel;
+  }
+
+  public static DrillAnalyzeRel convert(Analyze analyze, ConversionContext 
context) throws InvalidRelException {
+    RelNode input = context.toRel(analyze.getInput());
+    return new DrillAnalyzeRel(context.getCluster(), 
context.getLogicalTraits(), input);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index ed9b32f..afddbfc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -30,6 +30,7 @@ import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
 import org.apache.drill.exec.physical.base.SchemalessScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.server.options.SessionOptionManager;
@@ -45,6 +46,7 @@ public abstract class DrillTable implements Table {
   private final Object selection;
   private final StoragePlugin plugin;
   private final String userName;
+  private DrillStatsTable statsTable;
   private GroupScan scan;
   private SessionOptionManager options;
 
@@ -131,6 +133,14 @@ public abstract class DrillTable implements Table {
     return Statistics.UNKNOWN;
   }
 
+  public DrillStatsTable getStatsTable() {
+    return statsTable;
+  }
+
+  public void setStatsTable(DrillStatsTable statsTable) {
+    this.statsTable = statsTable;
+  }
+
   public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable table) {
     return new DrillScanRel(context.getCluster(),
         context.getCluster().traitSetOf(DrillRel.DRILL_LOGICAL),
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
index 23ea23f..6869616 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
@@ -46,6 +46,7 @@ public class FileSystemCreateTableEntry implements 
CreateTableEntry {
   private FileSystemConfig storageConfig;
   private FormatPlugin formatPlugin;
   private String location;
+  private boolean append;
   private final List<String> partitionColumns;
   private final StorageStrategy storageStrategy;
 
@@ -53,6 +54,7 @@ public class FileSystemCreateTableEntry implements 
CreateTableEntry {
   public FileSystemCreateTableEntry(@JsonProperty("storageConfig") 
FileSystemConfig storageConfig,
                                     @JsonProperty("formatConfig") 
FormatPluginConfig formatConfig,
                                     @JsonProperty("location") String location,
+                                    @JsonProperty("append") boolean append,
                                     @JsonProperty("partitionColumn") 
List<String> partitionColumns,
                                     @JsonProperty("storageStrategy") 
StorageStrategy storageStrategy,
                                     @JacksonInject StoragePluginRegistry 
engineRegistry)
@@ -67,11 +69,13 @@ public class FileSystemCreateTableEntry implements 
CreateTableEntry {
   public FileSystemCreateTableEntry(FileSystemConfig storageConfig,
                                     FormatPlugin formatPlugin,
                                     String location,
+                                    boolean append,
                                     List<String> partitionColumns,
                                     StorageStrategy storageStrategy) {
     this.storageConfig = storageConfig;
     this.formatPlugin = formatPlugin;
     this.location = location;
+    this.append = append;
     this.partitionColumns = partitionColumns;
     this.storageStrategy = storageStrategy;
   }
@@ -94,7 +98,7 @@ public class FileSystemCreateTableEntry implements 
CreateTableEntry {
           formatPlugin.getName())).build(logger);
     }
 
-    AbstractWriter writer = formatPlugin.getWriter(child, location, 
partitionColumns);
+    AbstractWriter writer = formatPlugin.getWriter(child, location, append, 
partitionColumns);
     writer.setStorageStrategy(storageStrategy);
     return writer;
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AnalyzePrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AnalyzePrule.java
new file mode 100644
index 0000000..4cac5d9
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AnalyzePrule.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.planner.logical.DrillAnalyzeRel;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+
+import java.util.List;
+
+public class AnalyzePrule extends Prule {
+  public static final RelOptRule INSTANCE = new AnalyzePrule();
+
+  private static final List<String> FUNCTIONS = ImmutableList.of(
+      "statcount", // total number of entries in the table
+      "nonnullstatcount", // total number of non-null entries in the table
+      "ndv",  // total distinctive values in table
+      "hll" // HyperLogLog
+  );
+
+  public AnalyzePrule() {
+    super(RelOptHelper.some(DrillAnalyzeRel.class, DrillRel.DRILL_LOGICAL, 
RelOptHelper.any(RelNode.class)), "Prel.AnalyzePrule");
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final DrillAnalyzeRel analyze = (DrillAnalyzeRel) call.rel(0);
+    final RelNode input = call.rel(1);
+
+    final RelTraitSet traits = 
input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
+    final RelNode convertedInput = convert(input, traits);
+
+    final StatsAggPrel statsAggPrel = new StatsAggPrel(convertedInput, 
analyze.getCluster(), FUNCTIONS);
+
+    final List<String> mapFileds = Lists.newArrayList(FUNCTIONS);
+    mapFileds.add(DrillStatsTable.COL_COLUMN);
+    final SingleRel newAnalyze = new UnpivotMapsPrel(statsAggPrel, 
analyze.getCluster(), mapFileds);
+
+    call.transformTo(newAnalyze);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StatsAggPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StatsAggPrel.java
new file mode 100644
index 0000000..124246b
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StatsAggPrel.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.StatisticsAggregate;
+import org.apache.drill.exec.planner.common.DrillRelNode;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class StatsAggPrel extends SingleRel implements DrillRelNode, Prel {
+
+  private List<String> functions;
+
+  public StatsAggPrel(RelNode child, RelOptCluster cluster, List<String> 
functions) {
+    super(cluster, child.getTraitSet(), child);
+    this.functions = ImmutableList.copyOf(functions);
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new StatsAggPrel(sole(inputs), getCluster(), 
ImmutableList.copyOf(functions));
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator)
+      throws IOException {
+    Prel child = (Prel) this.getInput();
+
+    PhysicalOperator childPOP = child.getPhysicalOperator(creator);
+
+    StatisticsAggregate g = new StatisticsAggregate(childPOP, functions);
+
+    return creator.addMetadata(this, g);
+  }
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getInput());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> 
logicalVisitor, X value)
+      throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.ALL;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+  @Override
+  public boolean needsFinalColumnReordering() {
+    return true;
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnpivotMapsPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnpivotMapsPrel.java
new file mode 100644
index 0000000..4fc7aae
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnpivotMapsPrel.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.UnpivotMaps;
+import org.apache.drill.exec.planner.common.DrillRelNode;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+
+public class UnpivotMapsPrel extends SingleRel implements Prel, DrillRelNode {
+
+  private List<String> mapFieldsNames;
+
+  public UnpivotMapsPrel(RelNode child, RelOptCluster cluster, List<String> 
mapFieldsNames) {
+    super(cluster, child.getTraitSet(), child);
+    this.mapFieldsNames = mapFieldsNames;
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new UnpivotMapsPrel(sole(inputs), getCluster(), mapFieldsNames);
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator)
+      throws IOException {
+    Prel child = (Prel) this.getInput();
+
+    PhysicalOperator childPOP = child.getPhysicalOperator(creator);
+
+    UnpivotMaps um = new UnpivotMaps(childPOP, mapFieldsNames);
+    return creator.addMetadata(this, um);
+  }
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getInput());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> 
logicalVisitor, X value)
+      throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+  @Override
+  public boolean needsFinalColumnReordering() {
+    return false;
+  }
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java
new file mode 100644
index 0000000..cdfe31b
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.logical.DrillAnalyzeRel;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillScreenRel;
+import org.apache.drill.exec.planner.logical.DrillStoreRel;
+import org.apache.drill.exec.planner.logical.DrillWriterRel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
+import org.apache.drill.exec.planner.sql.parser.SqlAnalyzeTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
+
+import java.io.IOException;
+import java.util.List;
+
+public class AnalyzeTableHandler extends DefaultSqlHandler {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AnalyzeTableHandler.class);
+
+  public AnalyzeTableHandler(SqlHandlerConfig config, Pointer<String> 
textPlan) {
+    super(config, textPlan);
+  }
+
+  @Override
+  public PhysicalPlan getPlan(SqlNode sqlNode)
+      throws ValidationException, RelConversionException, IOException, 
ForemanSetupException {
+    final SqlAnalyzeTable sqlAnalyzeTable = unwrap(sqlNode, 
SqlAnalyzeTable.class);
+
+    verifyNoUnsupportedFunctions(sqlAnalyzeTable);
+
+    SqlIdentifier tableIdentifier = sqlAnalyzeTable.getTableIdentifier();
+    SqlSelect scanSql = new SqlSelect(
+        SqlParserPos.ZERO, /* position */
+        SqlNodeList.EMPTY, /* keyword list */
+        getColumnList(sqlAnalyzeTable), /*select list */
+        tableIdentifier, /* from */
+        null, /* where */
+        null, /* group by */
+        null, /* having */
+        null, /* windowDecls */
+        null, /* orderBy */
+        null, /* offset */
+        null /* fetch */
+    );
+
+    final ConvertedRelNode convertedRelNode = 
validateAndConvert(rewrite(scanSql));
+    final RelDataType validatedRowType = 
convertedRelNode.getValidatedRowType();
+
+    final RelNode relScan = convertedRelNode.getConvertedNode();
+
+    final String tableName = sqlAnalyzeTable.getName();
+    final AbstractSchema drillSchema = 
SchemaUtilites.resolveToMutableDrillSchema(
+        config.getConverter().getDefaultSchema(), 
sqlAnalyzeTable.getSchemaPath());
+
+    if (SqlHandlerUtil.getTableFromSchema(drillSchema, tableName) == null) {
+      throw UserException.validationError()
+          .message("No table with given name [%s] exists in schema [%s]", 
tableName, drillSchema.getFullSchemaName())
+          .build(logger);
+    }
+
+    // Convert the query to Drill Logical plan and insert a writer operator on 
top.
+    DrillRel drel = convertToDrel(relScan, drillSchema, tableName);
+    Prel prel = convertToPrel(drel, validatedRowType);
+    logAndSetTextPlan("Drill Physical", prel, logger);
+    PhysicalOperator pop = convertToPop(prel);
+    PhysicalPlan plan = convertToPlan(pop);
+    log("Drill Plan", plan, logger);
+
+    return plan;
+  }
+
+  private SqlNodeList getColumnList(final SqlAnalyzeTable sqlAnalyzeTable) {
+    final SqlNodeList columnList = new SqlNodeList(SqlParserPos.ZERO);
+
+    final List<String> fields = sqlAnalyzeTable.getFieldNames();
+    if (fields == null || fields.size() <= 0) {
+      columnList.add(new SqlIdentifier("*", SqlParserPos.ZERO));
+    } else {
+      for(String field : fields) {
+        columnList.add(new SqlIdentifier(field, SqlParserPos.ZERO));
+      }
+    }
+
+    return columnList;
+  }
+
+  protected DrillRel convertToDrel(RelNode relNode, AbstractSchema schema, 
String analyzeTableName)
+      throws RelConversionException, SqlUnsupportedException {
+    final DrillRel convertedRelNode = convertToDrel(relNode);
+
+    if (convertedRelNode instanceof DrillStoreRel) {
+      throw new UnsupportedOperationException();
+    }
+
+    final RelNode analyzeRel = new DrillAnalyzeRel(
+        convertedRelNode.getCluster(),
+        convertedRelNode.getTraitSet(),
+        convertedRelNode
+    );
+
+    final RelNode writerRel = new DrillWriterRel(
+        analyzeRel.getCluster(),
+        analyzeRel.getTraitSet(),
+        analyzeRel,
+        schema.appendToStatsTable(analyzeTableName)
+    );
+
+    return new DrillScreenRel(writerRel.getCluster(), writerRel.getTraitSet(), 
writerRel);
+  }
+
+  // make sure no unsupported features in ANALYZE statement are used
+  private static void verifyNoUnsupportedFunctions(final SqlAnalyzeTable 
analyzeTable) {
+    // throw unsupported error for functions that are not yet implemented
+    if (analyzeTable.getEstimate()) {
+      throw UserException.unsupportedError()
+          .message("Statistics estimation is not yet supported.")
+          .build(logger);
+    }
+
+    if (analyzeTable.getPercent() != 100) {
+      throw UserException.unsupportedError()
+          .message("Statistics from sampling is not yet supported.")
+          .build(logger);
+    }
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 0881dc1..52ae7b1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -73,6 +73,7 @@ import org.apache.drill.exec.physical.impl.join.JoinUtils;
 import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.planner.PlannerType;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+import 
org.apache.drill.exec.planner.common.DrillStatsTable.StatsMaterializationVisitor;
 import org.apache.drill.exec.planner.cost.DrillDefaultRelMetadataProvider;
 import org.apache.drill.exec.planner.logical.DrillProjectRel;
 import org.apache.drill.exec.planner.logical.DrillRel;
@@ -230,6 +231,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     }
 
     try {
+      StatsMaterializationVisitor.materialize(relNode, context);
 
       // HEP for rules, which are failed at the LOGICAL_PLANNING stage for 
Volcano planner
       final RelNode setOpTransposeNode = transform(PlannerType.HEP, 
PlannerPhase.PRE_LOGICAL_PLANNING, relNode);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
index 57a7e17..210f43b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
@@ -64,6 +64,7 @@ public class CompoundIdentifierConverter extends SqlShuttle {
     // Every element of the array corresponds to the item in the list
     // returned by getOperandList() method for concrete SqlCall implementation.
     REWRITE_RULES = ImmutableMap.<Class<? extends SqlCall>, 
RewriteType[]>builder()
+        .put(SqlAnalyzeTable.class, arrayOf(D, D, E, D))
         .put(SqlSelect.class, arrayOf(D, E, D, E, E, E, E, E, D, D))
         .put(SqlCreateTable.class, arrayOf(D, D, D, E, D, D))
         .put(SqlCreateView.class, arrayOf(D, E, E, D))
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlAnalyzeTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlAnalyzeTable.java
new file mode 100644
index 0000000..91f83bf
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlAnalyzeTable.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.parser;
+
+import java.util.List;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.Util;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AnalyzeTableHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
+import org.apache.drill.exec.util.Pointer;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+/**
+ * SQL tree for ANALYZE statement.
+ */
+public class SqlAnalyzeTable extends DrillSqlCall {
+  public static final SqlSpecialOperator OPERATOR = new 
SqlSpecialOperator("ANALYZE_TABLE", SqlKind.OTHER) {
+    public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, 
SqlNode... operands) {
+      Preconditions.checkArgument(operands.length == 4, 
"SqlAnalyzeTable.createCall() has to get 4 operands!");
+      return new SqlAnalyzeTable(pos, (SqlIdentifier) operands[0], 
(SqlLiteral) operands[1],
+          (SqlNodeList) operands[2], (SqlNumericLiteral) operands[3]
+      );
+    }
+  };
+
+  private final SqlIdentifier tblName;
+  private final SqlLiteral estimate;
+  private final SqlNodeList fieldList;
+  private final SqlNumericLiteral percent;
+
+  public SqlAnalyzeTable(SqlParserPos pos, SqlIdentifier tblName, SqlLiteral 
estimate,
+      SqlNodeList fieldList, SqlNumericLiteral percent) {
+    super(pos);
+    this.tblName = tblName;
+    this.estimate = estimate;
+    this.fieldList = fieldList;
+    this.percent = percent;
+  }
+
+  @Override
+  public SqlOperator getOperator() {
+    return OPERATOR;
+  }
+
+  @Override
+  public List<SqlNode> getOperandList() {
+    final List<SqlNode> operands = Lists.newArrayListWithCapacity(4);
+    operands.add(tblName);
+    operands.add(estimate);
+    operands.add(fieldList);
+    operands.add(percent);
+    return operands;
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+    writer.keyword("ANALYZE");
+    writer.keyword("TABLE");
+    tblName.unparse(writer, leftPrec, rightPrec);
+    writer.keyword(estimate.booleanValue() ? "ESTIMATE" : "COMPUTE");
+    writer.keyword("STATISTICS");
+    writer.keyword("FOR");
+
+    if (fieldList != null && fieldList.size() > 0) {
+      writer.keyword("COLUMNS");
+      writer.keyword("(");
+      fieldList.get(0).unparse(writer, leftPrec, rightPrec);
+      for (int i = 1; i < fieldList.size(); i++) {
+        writer.keyword(",");
+        fieldList.get(i).unparse(writer, leftPrec, rightPrec);
+      }
+      writer.keyword(")");
+    } else {
+      writer.keyword("ALL");
+      writer.keyword("COLUMNS");
+    }
+    writer.keyword("SAMPLE");
+    percent.unparse(writer, leftPrec, rightPrec);
+    writer.keyword("PERCENT");
+  }
+
+  @Override
+  public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config, 
Pointer<String> textPlan) {
+    return new AnalyzeTableHandler(config, textPlan);
+  }
+
+  @Override
+  public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+    return getSqlHandler(config, null);
+  }
+
+  public List<String> getSchemaPath() {
+    if (tblName.isSimple()) {
+      return ImmutableList.of();
+    }
+
+    return tblName.names.subList(0, tblName.names.size() - 1);
+  }
+
+  public SqlIdentifier getTableIdentifier() {
+    return tblName;
+  }
+
+  public String getName() {
+    return Util.last(tblName.names);
+  }
+
+  public List<String> getFieldNames() {
+    if (fieldList == null) {
+      return ImmutableList.of();
+    }
+
+    List<String> columnNames = Lists.newArrayList();
+    for (SqlNode node : fieldList.getList()) {
+      columnNames.add(node.toString());
+    }
+    return columnNames;
+  }
+
+  public boolean getEstimate() {
+    return estimate.booleanValue();
+  }
+
+  public int getPercent() {
+    return percent.intValue(true);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
index 8f4f067..c510525 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
@@ -149,6 +149,40 @@ public abstract class AbstractSchema implements Schema, 
SchemaPartitionExplorer,
   }
 
   /**
+   * Create stats table entry for given <i>tableName</i>.
+   * @param tableName
+   * @return
+   */
+  public CreateTableEntry createStatsTable(String tableName) {
+    throw UserException.unsupportedError()
+        .message("Statistics tables are not supported in schema [%s]", 
getSchemaPath())
+        .build(logger);
+  }
+
+  /**
+   * Create an append statistics table entry for given <i>tableName</i>. If 
there is not existing
+   * statistics table, a new one is created.
+   * @param tableName
+   * @return
+   */
+  public CreateTableEntry appendToStatsTable(String tableName) {
+    throw UserException.unsupportedError()
+        .message("Statistics tables are not supported in schema [%s]", 
getSchemaPath())
+        .build(logger);
+  }
+
+  /**
+   * Get the statistics table for given <i>tableName</i>
+   * @param tableName
+   * @return
+   */
+  public Table getStatsTable(String tableName) {
+    throw UserException.unsupportedError()
+        .message("Statistics tables are not supported in schema [%s]", 
getSchemaPath())
+        .build(logger);
+  }
+
+  /**
    * Reports whether to show items from this schema in INFORMATION_SCHEMA
    * tables.
    * (Controls ... TODO:  Doc.:  Mention what this typically controls or
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
index 3a747a6..2539c64 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
@@ -68,6 +68,21 @@ public class SubSchemaWrapper extends AbstractSchema {
   }
 
   @Override
+  public CreateTableEntry createStatsTable(String tableName) {
+    return innerSchema.createStatsTable(tableName);
+  }
+
+  @Override
+  public CreateTableEntry appendToStatsTable(String tableName) {
+    return innerSchema.appendToStatsTable(tableName);
+  }
+
+  @Override
+  public Table getStatsTable(String tableName) {
+    return innerSchema.getStatsTable(tableName);
+  }
+
+  @Override
   public Collection<Function> getFunctions(String name) {
     return innerSchema.getFunctions(name);
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index 795cbd2..45dea13 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -164,6 +164,21 @@ public class FileSystemSchemaFactory extends 
AbstractSchemaFactory {
     }
 
     @Override
+    public CreateTableEntry createStatsTable(String tableName) {
+      return defaultSchema.createStatsTable(tableName);
+    }
+
+    @Override
+    public CreateTableEntry appendToStatsTable(String tableName) {
+      return defaultSchema.appendToStatsTable(tableName);
+    }
+
+    @Override
+    public Table getStatsTable(String tableName) {
+      return defaultSchema.getStatsTable(tableName);
+    }
+
+    @Override
     public AbstractSchema getDefaultSchema() {
       return defaultSchema;
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 27a72e3..bf258c2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -49,7 +49,7 @@ public interface FormatPlugin {
 
   FormatMatcher getMatcher();
 
-  AbstractWriter getWriter(PhysicalOperator child, String location, 
List<String> partitionColumns) throws IOException;
+  public AbstractWriter getWriter(PhysicalOperator child, String location, 
boolean append, List<String> partitionColumns) throws IOException;
 
   Set<StoragePluginOptimizerRule> getOptimizerRules();
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index 493278c..fc51221 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.dfs;
 
 import static java.util.Collections.unmodifiableList;
+import static org.apache.drill.exec.dotdrill.DotDrillType.STATS;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -44,6 +45,7 @@ import org.apache.calcite.schema.TranslatableTable;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.config.LogicalPlanPersistence;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.FormatPluginConfig;
@@ -53,7 +55,7 @@ import org.apache.drill.exec.dotdrill.DotDrillFile;
 import org.apache.drill.exec.dotdrill.DotDrillType;
 import org.apache.drill.exec.dotdrill.DotDrillUtil;
 import org.apache.drill.exec.dotdrill.View;
-import org.apache.drill.exec.store.StorageStrategy;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
@@ -65,6 +67,8 @@ import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
+import org.apache.drill.exec.store.StorageStrategy;
+import org.apache.drill.exec.store.easy.json.JSONFormatPlugin;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -202,6 +206,17 @@ public class WorkspaceSchemaFactory {
     return plugin;
   }
 
+  // Ensure given tableName is not a stats table
+  private static void ensureNotStatsTable(final String tableName) {
+    if (tableName.toLowerCase().endsWith(STATS.getEnding())) {
+      throw UserException
+          .validationError()
+          .message("Given table [%s] is already a stats table. " +
+              "Cannot perform stats operations on a stats table.", tableName)
+          .build(logger);
+    }
+  }
+
   /**
    * Implementation of a table macro that generates a table based on parameters
    */
@@ -551,7 +566,47 @@ public class WorkspaceSchemaFactory {
       } catch (UnsupportedOperationException e) {
         logger.debug("The filesystem for this workspace does not support this 
operation.", e);
       }
-      return tables.get(tableKey);
+      final DrillTable table = tables.get(tableKey);
+      setMetadataTable(table, tableName);
+      return table;
+    }
+
+    private void setMetadataTable(final DrillTable table, final String 
tableName) {
+      if (table == null) {
+        return;
+      }
+
+      // If this itself is the stats table, then skip it.
+      if (tableName.toLowerCase().endsWith(STATS.getEnding())) {
+        return;
+      }
+
+      try {
+        if (table.getStatsTable() == null) {
+          Table statsTable = getStatsTable(tableName);
+          if (statsTable != null) {
+            table.setStatsTable(new DrillStatsTable(getFullSchemaName(), 
getStatsTableName(tableName)));
+          }
+        }
+      } catch (final Exception e) {
+        logger.warn("Failed to find the stats table for table [{}] in schema 
[{}]", tableName, getFullSchemaName());
+      }
+    }
+
+    // Get stats table name for a given table name.
+    private String getStatsTableName(final String tableName) {
+      final Path tablePath = new Path(config.getLocation(), tableName);
+      try {
+        if (fs.isDirectory(tablePath)) {
+          return tableName + Path.SEPARATOR + STATS.getEnding();
+        } else {
+          return tableName + STATS.getEnding();
+        }
+      } catch (final Exception e) {
+        throw new DrillRuntimeException(
+            String.format("Failed to find the location of the stats for table 
[%s] in schema [%s]",
+                tableName, getFullSchemaName()));
+      }
     }
 
     @Override
@@ -571,6 +626,34 @@ public class WorkspaceSchemaFactory {
     public CreateTableEntry createNewTable(String tableName, List<String> 
partitionColumns, StorageStrategy storageStrategy) {
       String storage = 
schemaConfig.getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val;
       FormatPlugin formatPlugin = plugin.getFormatPlugin(storage);
+      return createOrAppendToTable(tableName, false, formatPlugin, 
partitionColumns, storageStrategy);
+    }
+
+    @Override
+    public CreateTableEntry createStatsTable(String tableName) {
+      ensureNotStatsTable(tableName);
+      final String statsTableName = getStatsTableName(tableName);
+      FormatPlugin formatPlugin = 
plugin.getFormatPlugin(JSONFormatPlugin.DEFAULT_NAME);
+      return createOrAppendToTable(statsTableName, false, formatPlugin, 
ImmutableList.<String>of(),
+          StorageStrategy.DEFAULT);
+    }
+
+    @Override
+    public CreateTableEntry appendToStatsTable(String tableName) {
+      ensureNotStatsTable(tableName);
+      final String statsTableName = getStatsTableName(tableName);
+      FormatPlugin formatPlugin = 
plugin.getFormatPlugin(JSONFormatPlugin.DEFAULT_NAME);
+      return createOrAppendToTable(statsTableName, true, formatPlugin, 
ImmutableList.<String>of(),
+          StorageStrategy.DEFAULT);
+    }
+
+    @Override
+    public Table getStatsTable(String tableName) {
+      return getTable(getStatsTableName(tableName));
+    }
+
+    private CreateTableEntry createOrAppendToTable(String tableName, boolean 
append, FormatPlugin formatPlugin,
+        List<String> partitonColumns, StorageStrategy storageStrategy) {
       if (formatPlugin == null) {
         throw new UnsupportedOperationException(
           String.format("Unsupported format '%s' in workspace '%s'", 
config.getDefaultInputFormat(),
@@ -581,7 +664,8 @@ public class WorkspaceSchemaFactory {
           (FileSystemConfig) plugin.getConfig(),
           formatPlugin,
           config.getLocation() + Path.SEPARATOR + tableName,
-          partitionColumns,
+          append,
+          partitonColumns,
           storageStrategy);
     }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 4c550c3..ed1651e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -193,8 +193,8 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
   }
 
   @Override
-  public AbstractWriter getWriter(PhysicalOperator child, String location, 
List<String> partitionColumns) throws IOException {
-    return new EasyWriter(child, location, partitionColumns, this);
+  public AbstractWriter getWriter(PhysicalOperator child, String location, 
boolean append, List<String> partitionColumns) throws IOException {
+    return new EasyWriter(child, location, append, partitionColumns, this);
   }
 
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
index 379e2c9..9f41206 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -40,6 +40,7 @@ public class EasyWriter extends AbstractWriter {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(EasyWriter.class);
 
   private final String location;
+  private final boolean append;
   private final List<String> partitionColumns;
   private final EasyFormatPlugin<?> formatPlugin;
 
@@ -47,6 +48,7 @@ public class EasyWriter extends AbstractWriter {
   public EasyWriter(
       @JsonProperty("child") PhysicalOperator child,
       @JsonProperty("location") String location,
+      @JsonProperty("append") boolean append,
       @JsonProperty("partitionColumns") List<String> partitionColumns,
       @JsonProperty("storageStrategy") StorageStrategy storageStrategy,
       @JsonProperty("storage") StoragePluginConfig storageConfig,
@@ -57,18 +59,21 @@ public class EasyWriter extends AbstractWriter {
     this.formatPlugin = (EasyFormatPlugin<?>) 
engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for 
provided format config.");
     this.location = location;
+    this.append = append;
     this.partitionColumns = partitionColumns;
     setStorageStrategy(storageStrategy);
   }
 
   public EasyWriter(PhysicalOperator child,
                          String location,
+                         boolean append,
                          List<String> partitionColumns,
                          EasyFormatPlugin<?> formatPlugin) {
 
     super(child);
     this.formatPlugin = formatPlugin;
     this.location = location;
+    this.append = append;
     this.partitionColumns = partitionColumns;
   }
 
@@ -77,6 +82,11 @@ public class EasyWriter extends AbstractWriter {
     return location;
   }
 
+  @JsonProperty("append")
+  public boolean getAppend() {
+    return append;
+  }
+
   @JsonProperty("storage")
   public StoragePluginConfig getStorageConfig(){
     return formatPlugin.getStorageConfig();
@@ -94,7 +104,7 @@ public class EasyWriter extends AbstractWriter {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    EasyWriter writer = new EasyWriter(child, location, partitionColumns, 
formatPlugin);
+    EasyWriter writer = new EasyWriter(child, location, append, 
partitionColumns, formatPlugin);
     writer.setStorageStrategy(getStorageStrategy());
     return writer;
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 11dc204..ab90cda 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -70,6 +70,8 @@ public class JSONFormatPlugin extends 
EasyFormatPlugin<JSONFormatConfig> {
     Map<String, String> options = new HashMap<>();
 
     options.put("location", writer.getLocation());
+    options.put("append", Boolean.toString(writer.getAppend()));
+
     FragmentHandle handle = context.getHandle();
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), 
handle.getMinorFragmentId());
     options.put("prefix", fragmentId);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index 2e80b3f..d533c0a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -48,6 +48,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter 
implements RecordWr
 
   private Path cleanUpLocation;
   private String location;
+  private boolean append;
   private String prefix;
 
   private String fieldDelimiter;
@@ -74,6 +75,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter 
implements RecordWr
   @Override
   public void init(Map<String, String> writerOptions) throws IOException {
     this.location = writerOptions.get("location");
+    this.append = writerOptions.get("append").equalsIgnoreCase("true") ? true 
: false;
     this.prefix = writerOptions.get("prefix");
     this.fieldDelimiter = writerOptions.get("separator");
     this.extension = writerOptions.get("extension");
@@ -83,7 +85,11 @@ public class JsonRecordWriter extends JSONOutputRecordWriter 
implements RecordWr
 
     this.fs = FileSystem.get(fsConf);
 
-    Path fileName = new Path(location, prefix + "_" + index + "." + extension);
+    Path fileName;
+    do {
+      fileName = new Path(location, prefix + "_" + (index++) + "." + 
extension);
+    } while (append && fs.exists(fileName));
+
     try {
       // json writer does not support partitions, so only one file can be 
created
       // and thus only one location should be deleted in case of abort
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index f46cc1c..876cd5b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -126,14 +126,15 @@ public class ParquetFormatPlugin implements FormatPlugin {
   }
 
   @Override
-  public AbstractWriter getWriter(PhysicalOperator child, String location, 
List<String> partitionColumns) {
-    return new ParquetWriter(child, location, partitionColumns, this);
+  public AbstractWriter getWriter(PhysicalOperator child, String location, 
boolean append, List<String> partitionColumns) throws IOException {
+    return new ParquetWriter(child, location, append, partitionColumns, this);
   }
 
   public RecordWriter getRecordWriter(FragmentContext context, ParquetWriter 
writer) throws IOException, OutOfMemoryException {
     Map<String, String> options = new HashMap<>();
 
     options.put("location", writer.getLocation());
+    options.put("append", Boolean.toString(writer.getAppend()));
 
     FragmentHandle handle = context.getHandle();
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), 
handle.getMinorFragmentId());
@@ -261,6 +262,9 @@ public class ParquetFormatPlugin implements FormatPlugin {
               new FormatSelection(plugin.getConfig(), selection));
         }
       }
+      if (!super.supportDirectoryReads() && selection.containsDirectories(fs)) 
{
+        return null;
+      }
       return super.isReadable(fs, selection, fsPlugin, storageEngineName, 
schemaConfig);
     }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index aea3218..6298c1a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -53,6 +53,7 @@ public class ParquetWriter extends AbstractWriter {
   public static final int WRITER_VERSION = 2;
 
   private final String location;
+  private final boolean append;
   private final List<String> partitionColumns;
   private final ParquetFormatPlugin formatPlugin;
 
@@ -60,6 +61,7 @@ public class ParquetWriter extends AbstractWriter {
   public ParquetWriter(
           @JsonProperty("child") PhysicalOperator child,
           @JsonProperty("location") String location,
+          @JsonProperty("append") boolean append,
           @JsonProperty("partitionColumns") List<String> partitionColumns,
           @JsonProperty("storageStrategy") StorageStrategy storageStrategy,
           @JsonProperty("storage") StoragePluginConfig storageConfig,
@@ -69,18 +71,21 @@ public class ParquetWriter extends AbstractWriter {
     this.formatPlugin = (ParquetFormatPlugin) 
engineRegistry.getFormatPlugin(storageConfig, new ParquetFormatConfig());
     Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for 
provided format config.");
     this.location = location;
+    this.append = append;
     this.partitionColumns = partitionColumns;
     setStorageStrategy(storageStrategy);
   }
 
   public ParquetWriter(PhysicalOperator child,
                        String location,
+                       boolean append,
                        List<String> partitionColumns,
                        ParquetFormatPlugin formatPlugin) {
 
     super(child);
     this.formatPlugin = formatPlugin;
     this.location = location;
+    this.append = append;
     this.partitionColumns = partitionColumns;
   }
 
@@ -89,6 +94,11 @@ public class ParquetWriter extends AbstractWriter {
     return location;
   }
 
+  @JsonProperty("append")
+  public boolean getAppend() {
+    return append;
+  }
+
   @JsonProperty("storage")
   public StoragePluginConfig getStorageConfig(){
     return formatPlugin.getStorageConfig();
@@ -111,7 +121,7 @@ public class ParquetWriter extends AbstractWriter {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    ParquetWriter writer = new ParquetWriter(child, location, 
partitionColumns, formatPlugin);
+    ParquetWriter writer = new ParquetWriter(child, location, append, 
partitionColumns, formatPlugin);
     writer.setStorageStrategy(getStorageStrategy());
     return writer;
   }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java 
b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java
new file mode 100644
index 0000000..0f15fb3
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.sql;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.exec.ExecConstants;
+import org.junit.Test;
+
+public class TestAnalyze extends PlanTestBase {
+
+  // Analyze for all columns
+  @Test
+  public void basic1() throws Exception {
+    try {
+      test("ALTER SESSION SET `planner.slice_target` = 1");
+      test("CREATE TABLE dfs_test.tmp.region_basic1 AS SELECT * from 
cp.`region.json`");
+      test("ANALYZE TABLE dfs_test.tmp.region_basic1 COMPUTE STATISTICS FOR 
ALL COLUMNS");
+      test("SELECT * FROM dfs_test.tmp.`region_basic1/.stats.drill`");
+
+      testBuilder()
+          .sqlQuery("SELECT `column`, statcount, nonnullstatcount, ndv FROM 
dfs_test.tmp.`region_basic1/.stats.drill`")
+          .unOrdered()
+          .baselineColumns("column", "statcount", "nonnullstatcount", "ndv")
+          .baselineValues("region_id", 110L, 110L, 107L)
+          .baselineValues("sales_city", 110L, 110L, 111L)
+          .baselineValues("sales_state_province", 110L, 110L, 13L)
+          .baselineValues("sales_district", 110L, 110L, 22L)
+          .baselineValues("sales_region", 110L, 110L, 8L)
+          .baselineValues("sales_country", 110L, 110L, 4L)
+          .baselineValues("sales_district_id", 110L, 110L, 23L)
+          .go();
+
+      // we can't compare the ndv for correctness as it is an estimate and not 
accurate
+      testBuilder()
+          .sqlQuery("SELECT statcount FROM 
dfs_test.tmp.`region_basic1/.stats.drill` WHERE `column` = 'region_id'")
+          .unOrdered()
+          .sqlBaselineQuery("SELECT count(region_id) AS statcount FROM 
dfs_test.tmp.region_basic1")
+          .go();
+
+    } finally {
+      test("ALTER SESSION SET `planner.slice_target` = " + 
ExecConstants.SLICE_TARGET_DEFAULT);
+    }
+  }
+
+  // Analyze for only a subset of the columns in table
+  @Test
+  public void basic2() throws Exception {
+    try {
+      test("ALTER SESSION SET `planner.slice_target` = 1");
+      test("CREATE TABLE dfs_test.tmp.employee_basic2 AS SELECT * from 
cp.`employee.json`");
+      test("ANALYZE TABLE dfs_test.tmp.employee_basic2 COMPUTE STATISTICS FOR 
COLUMNS (employee_id, birth_date)");
+      test("SELECT * FROM dfs_test.tmp.`employee_basic2/.stats.drill`");
+
+      testBuilder()
+          .sqlQuery("SELECT `column`, statcount, nonnullstatcount, ndv FROM 
dfs_test.tmp.`employee_basic2/.stats.drill`")
+          .unOrdered()
+          .baselineColumns("column", "statcount", "nonnullstatcount", "ndv")
+          .baselineValues("employee_id", 1155L, 1155L, 1144L)
+          .baselineValues("birth_date", 1155L, 1155L, 53L)
+          .go();
+
+      // we can't compare the ndv for correctness as it is an estimate and not 
accurate
+      testBuilder()
+          .sqlQuery("SELECT statcount FROM 
dfs_test.tmp.`employee_basic2/.stats.drill` WHERE `column` = 'birth_date'")
+          .unOrdered()
+          .sqlBaselineQuery("SELECT count(birth_date) AS statcount FROM 
dfs_test.tmp.employee_basic2")
+          .go();
+
+    } finally {
+      test("ALTER SESSION SET `planner.slice_target` = " + 
ExecConstants.SLICE_TARGET_DEFAULT);
+    }
+  }
+
+  @Test
+  public void join() throws Exception {
+    try {
+      test("ALTER SESSION SET `planner.slice_target` = 1");
+      test("CREATE TABLE dfs_test.tmp.lineitem AS SELECT * FROM 
cp.`tpch/lineitem.parquet`");
+      test("CREATE TABLE dfs_test.tmp.orders AS select * FROM 
cp.`tpch/orders.parquet`");
+      test("ANALYZE TABLE dfs_test.tmp.lineitem COMPUTE STATISTICS FOR ALL 
COLUMNS");
+      test("ANALYZE TABLE dfs_test.tmp.orders COMPUTE STATISTICS FOR ALL 
COLUMNS");
+      test("SELECT * FROM dfs_test.tmp.`lineitem/.stats.drill`");
+      test("SELECT * FROM dfs_test.tmp.`orders/.stats.drill`");
+
+      test("SELECT * FROM dfs_test.tmp.`lineitem` l JOIN dfs_test.tmp.`orders` 
o ON l.l_orderkey = o.o_orderkey");
+    } finally {
+      test("ALTER SESSION SET `planner.slice_target` = " + 
ExecConstants.SLICE_TARGET_DEFAULT);
+    }
+  }
+}
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index 67a3bb8..32c2a90 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -172,6 +172,10 @@
           <groupId>sqlline</groupId>
           <artifactId>sqlline</artifactId>
         </exclusion>
+        <exclusion>
+          <artifactId>stream</artifactId>
+          <groupId>com.clearspring.analytics</groupId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
diff --git 
a/logical/src/main/java/org/apache/drill/common/logical/data/Analyze.java 
b/logical/src/main/java/org/apache/drill/common/logical/data/Analyze.java
new file mode 100644
index 0000000..711050d
--- /dev/null
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/Analyze.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.logical.data;
+
+import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("analyze")
+public class Analyze extends SingleInputOperator {
+
+  @JsonCreator
+  public Analyze() { }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> 
logicalVisitor, X value) throws E {
+    return logicalVisitor.visitAnalyze(this, value);
+  }
+}
diff --git 
a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
 
b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
index 482146f..c46dc43 100644
--- 
a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
+++ 
b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
@@ -19,6 +19,7 @@ package org.apache.drill.common.logical.data.visitors;
 
 import org.apache.drill.common.logical.data.LateralJoin;
 import org.apache.drill.common.logical.data.Unnest;
+import org.apache.drill.common.logical.data.Analyze;
 import org.apache.drill.common.logical.data.Values;
 import org.apache.drill.common.logical.data.Filter;
 import org.apache.drill.common.logical.data.Flatten;
@@ -46,6 +47,11 @@ public abstract class AbstractLogicalVisitor<T, X, E extends 
Throwable> implemen
     }
 
     @Override
+    public T visitAnalyze(Analyze analyze, X value) throws E {
+      return visitOp(analyze, value);
+    }
+
+    @Override
     public T visitScan(Scan scan, X value) throws E {
         return visitOp(scan, value);
     }
diff --git 
a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
 
b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
index 9d9013e..ee9036c 100644
--- 
a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
+++ 
b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
@@ -20,6 +20,7 @@ package org.apache.drill.common.logical.data.visitors;
 
 import org.apache.drill.common.logical.data.LateralJoin;
 import org.apache.drill.common.logical.data.Unnest;
+import org.apache.drill.common.logical.data.Analyze;
 import org.apache.drill.common.logical.data.Values;
 import org.apache.drill.common.logical.data.Filter;
 import org.apache.drill.common.logical.data.Flatten;
@@ -51,6 +52,7 @@ public interface LogicalVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
     public RETURN visitGroupingAggregate(GroupingAggregate groupBy, EXTRA 
value) throws EXCEP;
     public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
     public RETURN visitFlatten(Flatten flatten, EXTRA value) throws EXCEP;
+    public RETURN visitAnalyze(Analyze analyze, EXTRA value) throws EXCEP;
 
     public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
     public RETURN visitValues(Values constant, EXTRA value) throws EXCEP;
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java 
b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index c540c8f..635f972 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -601,6 +601,14 @@ public final class UserBitShared {
      * <code>SYSLOG_SUB_SCAN = 58;</code>
      */
     SYSLOG_SUB_SCAN(58, 58),
+    /**
+     * <code>STATISTICS_AGGREGATE = 59;</code>
+     */
+    STATISTICS_AGGREGATE(59, 59),
+    /**
+     * <code>UNPIVOT_MAPS = 60;</code>
+     */
+    UNPIVOT_MAPS(60, 60),
     ;
 
     /**
@@ -839,6 +847,14 @@ public final class UserBitShared {
      * <code>SYSLOG_SUB_SCAN = 58;</code>
      */
     public static final int SYSLOG_SUB_SCAN_VALUE = 58;
+    /**
+     * <code>STATISTICS_AGGREGATE = 59;</code>
+     */
+    public static final int STATISTICS_AGGREGATE_VALUE = 59;
+    /**
+     * <code>UNPIVOT_MAPS = 60;</code>
+     */
+    public static final int UNPIVOT_MAPS_VALUE = 60;
 
 
     public final int getNumber() { return value; }
@@ -904,6 +920,8 @@ public final class UserBitShared {
         case 56: return RUNTIME_FILTER;
         case 57: return ROWKEY_JOIN;
         case 58: return SYSLOG_SUB_SCAN;
+        case 59: return STATISTICS_AGGREGATE;
+        case 60: return UNPIVOT_MAPS;
         default: return null;
       }
     }
@@ -24644,7 +24662,7 @@ public final class UserBitShared {
       "entState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCA" +
       
"TION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCAN"
 +
       "CELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_REQ" 
+
-      "UESTED\020\006*\247\t\n\020CoreOperatorType\022\021\n\rSINGLE_" +
+      "UESTED\020\006*\323\t\n\020CoreOperatorType\022\021\n\rSINGLE_" +
       "SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001\022\n\n\006FILTER" 
+
       
"\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n"
 +
       "\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITION_SENDER\020\006" +
@@ -24674,10 +24692,11 @@ public final class UserBitShared {
       "\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PARTITION_LIMI" +
       "T\0206\022\023\n\017PCAPNG_SUB_SCAN\0207\022\022\n\016RUNTIME_FILT" +
       "ER\0208\022\017\n\013ROWKEY_JOIN\0209\022\023\n\017SYSLOG_SUB_SCAN" +
-      "\020:*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSA" +
-      
"SL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL" +
-      "_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apach" +
-      "e.drill.exec.protoB\rUserBitSharedH\001"
+      "\020:\022\030\n\024STATISTICS_AGGREGATE\020;\022\020\n\014UNPIVOT_" +
+      "MAPS\020<*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016" +
+      
"\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014" +
+      "SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.a" +
+      "pache.drill.exec.protoB\rUserBitSharedH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index 7d5041c..051f82f 100644
--- 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -80,7 +80,9 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
     PCAPNG_SUB_SCAN(55),
     RUNTIME_FILTER(56),
     ROWKEY_JOIN(57),
-    SYSLOG_SUB_SCAN(58);
+    SYSLOG_SUB_SCAN(58),
+    STATISTICS_AGGREGATE(59),
+    UNPIVOT_MAPS(60);
     
     public final int number;
     
@@ -157,6 +159,8 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
             case 56: return RUNTIME_FILTER;
             case 57: return ROWKEY_JOIN;
             case 58: return SYSLOG_SUB_SCAN;
+            case 59: return STATISTICS_AGGREGATE;
+            case 60: return UNPIVOT_MAPS;
             default: return null;
         }
     }
diff --git a/protocol/src/main/protobuf/UserBitShared.proto 
b/protocol/src/main/protobuf/UserBitShared.proto
index a0438b7..ca4e273 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -351,6 +351,8 @@ enum CoreOperatorType {
   RUNTIME_FILTER = 56;
   ROWKEY_JOIN = 57;
   SYSLOG_SUB_SCAN = 58;
+  STATISTICS_AGGREGATE = 59;
+  UNPIVOT_MAPS = 60;
 }
 
 /* Registry that contains list of jars, each jar contains its name and list of 
function signatures.

Reply via email to