This is an automated email from the ASF dual-hosted git repository.

dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f6ab84a7bf HIVE-29203: get_aggr_stats_for doesn't aggregate stats 
when direct sql… (#6089)
6f6ab84a7bf is described below

commit 6f6ab84a7bff723554f1edd689781a9aac800919
Author: ramitg254 <[email protected]>
AuthorDate: Fri Jan 23 12:27:41 2026 +0530

    HIVE-29203: get_aggr_stats_for doesn't aggregate stats when direct sql… 
(#6089)
---
 .../hadoop/hive/metastore/DirectSqlAggrStats.java  | 620 ++++++++++++++++++++
 .../hadoop/hive/metastore/DirectSqlUpdatePart.java |   2 +-
 .../hive/metastore/IExtrapolatePartStatus.java     |  34 ++
 .../hadoop/hive/metastore/MetaStoreDirectSql.java  | 630 +--------------------
 .../hive/metastore/MetastoreDirectSqlUtils.java    |  71 +++
 .../apache/hadoop/hive/metastore/ObjectStore.java  |   9 +-
 .../hadoop/hive/metastore/TestObjectStore.java     | 111 +++-
 7 files changed, 834 insertions(+), 643 deletions(-)

diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlAggrStats.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlAggrStats.java
new file mode 100644
index 00000000000..9e1d4273060
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlAggrStats.java
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.Query;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collections;
+
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.COLNAME;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.COLTYPE;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.LONG_LOW_VALUE;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.LONG_HIGH_VALUE;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.DOUBLE_LOW_VALUE;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.DOUBLE_HIGH_VALUE;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.BIG_DECIMAL_LOW_VALUE;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.BIG_DECIMAL_HIGH_VALUE;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.NUM_NULLS;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.NUM_DISTINCTS;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.AVG_COL_LEN;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.MAX_COL_LEN;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.NUM_TRUES;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.NUM_FALSES;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.SUM_NDV_LONG;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.SUM_NDV_DOUBLE;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.SUM_NDV_DECIMAL;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.COUNT_ROWS;
+import static 
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.SUM_NUM_DISTINCTS;
+import static 
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.getFullyQualifiedName;
+import static 
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.makeParams;
+import static 
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.executeWithArray;
+import static 
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.prepareParams;
+
+class DirectSqlAggrStats {
+  private static final int NO_BATCHING = -1;
+  private static final int DETECT_BATCHING = 0;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DirectSqlAggrStats.class);
+  private final PersistenceManager pm;
+  private final int batchSize;
+
+  @java.lang.annotation.Target(java.lang.annotation.ElementType.FIELD)
+  @java.lang.annotation.Retention(java.lang.annotation.RetentionPolicy.RUNTIME)
+  private @interface TableName {
+  }
+
+  @TableName
+  private String DBS, TBLS, PARTITIONS, PART_COL_STATS, TAB_COL_STATS;
+
+  public DirectSqlAggrStats(PersistenceManager pm, Configuration conf, String 
schema) {
+    this.pm = pm;
+    DatabaseProduct dbType = PersistenceManagerProvider.getDatabaseProduct();
+    int configBatchSize =
+        MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.DIRECT_SQL_PARTITION_BATCH_SIZE);
+    if (configBatchSize == DETECT_BATCHING) {
+      configBatchSize = dbType.needsInBatching() ? 1000 : NO_BATCHING;
+    }
+    this.batchSize = configBatchSize;
+    ImmutableMap.Builder<String, String> fieldNameToTableNameBuilder =
+        new ImmutableMap.Builder<>();
+
+    for (java.lang.reflect.Field f : this.getClass().getDeclaredFields()) {
+      if (f.getAnnotation(DirectSqlAggrStats.TableName.class) == null) {
+        continue;
+      }
+      try {
+        String value = getFullyQualifiedName(schema, f.getName());
+        f.set(this, value);
+        fieldNameToTableNameBuilder.put(f.getName(), value);
+      } catch (IllegalArgumentException | IllegalAccessException e) {
+        throw new RuntimeException("Internal error, cannot set " + 
f.getName());
+      }
+    }
+  }
+
+  /**
+   * Retrieve the column statistics for the specified columns of the table. 
NULL
+   * is returned if the columns are not provided.
+   *
+   * @param catName     the catalog name of the table
+   * @param dbName      the database name of the table
+   * @param tableName   the table name
+   * @param colNames    the list of the column names
+   * @param engine      engine making the request
+   * @return            the column statistics for the specified columns
+   * @throws MetaException
+   */
+  public ColumnStatistics getTableStats(final String catName,
+                                        final String dbName,
+                                        final String tableName, List<String> 
colNames,
+                                        String engine, boolean enableBitVector,
+                                        boolean enableKll) throws 
MetaException {
+    if (colNames == null || colNames.isEmpty()) {
+      return null;
+    }
+    final boolean doTrace = LOG.isDebugEnabled();
+    final String queryText0 = "select " + getStatsList(enableBitVector, 
enableKll) + " from " + TAB_COL_STATS
+        + " inner join " + TBLS + " on " + TAB_COL_STATS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\" "
+        + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\" "
+        + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? 
and " + TBLS + ".\"TBL_NAME\" = ?"
+        + " and \"ENGINE\" = ? and \"COLUMN_NAME\" in (";
+    Batchable<String, Object[]> b = new Batchable<String, Object[]>() {
+      @Override
+      public List<Object[]> run(List<String> input) throws MetaException {
+        Object[] params = new Object[input.size() + 4];
+        params[0] = catName;
+        params[1] = dbName;
+        params[2] = tableName;
+        params[3] = engine;
+        for (int i = 0; i < input.size(); ++i) {
+          params[i + 4] = input.get(i);
+        }
+        String queryText = queryText0 + makeParams(input.size()) + ")";
+        long start = doTrace ? System.nanoTime() : 0;
+        Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+        try {
+          Object qResult = executeWithArray(query, params, queryText);
+          MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0 + "...)", 
start,
+              (doTrace ? System.nanoTime() : 0));
+          if (qResult == null) {
+            return null;
+          }
+          return MetastoreDirectSqlUtils.ensureList(qResult);
+        } finally {
+          addQueryAfterUse(query);
+        }
+      }
+    };
+    List<Object[]> list;
+    try {
+      list = Batchable.runBatched(batchSize, colNames, b);
+      if (list != null) {
+        list = new ArrayList<>(list);
+      }
+    } finally {
+      b.closeAllQueries();
+    }
+
+    if (list == null || list.isEmpty()) {
+      return null;
+    }
+    ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, 
tableName);
+    csd.setCatName(catName);
+    return makeColumnStats(list, csd, 0, engine);
+  }
+
+  public List<ColumnStatisticsObj> columnStatisticsObjForPartitions(
+      final String catName, final String dbName,
+      final String tableName, final List<String> partNames,
+      List<String> colNames, String engine, long partsFound,
+      final boolean useDensityFunctionForNDVEstimation,
+      final double ndvTuner, final boolean enableBitVector,
+      boolean enableKll) throws MetaException {
+    final boolean areAllPartsFound = (partsFound == partNames.size());
+    return Batchable.runBatched(batchSize, colNames, new Batchable<String, 
ColumnStatisticsObj>() {
+      @Override
+      public List<ColumnStatisticsObj> run(final List<String> inputColNames) 
throws MetaException {
+        return columnStatisticsObjForPartitionsBatch(catName, dbName, 
tableName,
+            partNames, inputColNames, engine, areAllPartsFound,
+            useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector, 
enableKll);
+      }
+    });
+  }
+
+  /**
+   * Should be called with the list short enough to not trip up Oracle/etc.
+   */
+  private List<ColumnStatisticsObj> columnStatisticsObjForPartitionsBatch(
+      String catName,
+      String dbName, String tableName,
+      List<String> partNames, List<String> colNames, String engine,
+      boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation,
+      double ndvTuner, boolean enableBitVector,
+      boolean enableKll) throws MetaException {
+    if (enableBitVector || enableKll) {
+      return aggrStatsUseJava(catName, dbName, tableName, partNames,
+          colNames, engine, areAllPartsFound, 
useDensityFunctionForNDVEstimation,
+          ndvTuner, enableBitVector, enableKll);
+    } else {
+      return aggrStatsUseDB(catName, dbName, tableName, partNames, colNames, 
engine,
+          useDensityFunctionForNDVEstimation, ndvTuner);
+    }
+  }
+
+  private List<ColumnStatisticsObj> aggrStatsUseJava(
+      String catName, String dbName, String tableName,
+      List<String> partNames, List<String> colNames, String engine, boolean 
areAllPartsFound,
+      boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean 
enableBitVector,
+      boolean enableKll) throws MetaException {
+    // 1. get all the stats for colNames in partNames;
+    List<ColumnStatistics> partStats =
+        getPartitionStats(catName, dbName, tableName, partNames, colNames, 
engine, enableBitVector, enableKll);
+    // 2. use util function to aggr stats
+    return MetaStoreServerUtils.aggrPartitionStats(partStats, catName, dbName, 
tableName, partNames, colNames,
+        areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner);
+  }
+
+  private List<ColumnStatisticsObj> aggrStatsUseDB(
+      String catName, String dbName, String tableName,
+      List<String> partNames, List<String> colNames, String engine,
+      boolean useDensityFunctionForNDVEstimation, double ndvTuner)
+      throws MetaException {
+    // TODO: all the extrapolation logic should be moved out of this class,
+    // only mechanical data retrieval should remain here.
+    String queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "
+        + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), 
min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
+        + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), 
max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
+        + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
+        + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), 
sum(\"NUM_FALSES\"), "
+        // The following data is used to compute a partitioned table's NDV 
based
+        // on partitions' NDV when useDensityFunctionForNDVEstimation = true. 
Global NDVs cannot be
+        // accurately derived from partition NDVs, because the domain of 
column value two partitions
+        // can overlap. If there is no overlap then global NDV is just the sum
+        // of partition NDVs (UpperBound). But if there is some overlay then
+        // global NDV can be anywhere between sum of partition NDVs (no 
overlap)
+        // and same as one of the partition NDV (domain of column value in all 
other
+        // partitions is subset of the domain value in one of the partition)
+        // (LowerBound).But under uniform distribution, we can roughly 
estimate the global
+        // NDV by leveraging the min/max values.
+        // And, we also guarantee that the estimation makes sense by comparing 
it to the
+        // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
+        // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
+        + "sum((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" 
as decimal)),"
+        + 
"sum((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+        + "sum((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
+        + "count(1),"
+        + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + ""
+        + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+        + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
+        + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
+        + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? 
and " + TBLS + ".\"TBL_NAME\" = ? "
+        + " and \"COLUMN_NAME\" in (%1$s)" + " and " + PARTITIONS + 
".\"PART_NAME\" in (%2$s)"
+        + " and \"ENGINE\" = ? " + " group by \"COLUMN_NAME\", 
\"COLUMN_TYPE\"";
+
+    boolean doTrace = LOG.isDebugEnabled();
+
+    List<ColumnStatisticsObj> colStats = new ArrayList<>(colNames.size());
+    List<Object[]> partialStatsRows = new ArrayList<>(colNames.size());
+    columnWiseStatsMerger(queryText, catName, dbName, tableName,
+        colNames, partNames, colStats, partialStatsRows,
+        engine, useDensityFunctionForNDVEstimation, ndvTuner, doTrace);
+
+    // Extrapolation is needed for partialStatsRows.
+    if (partialStatsRows.size() != 0) {
+      Map<String, Integer> indexMap = new HashMap<String, Integer>();
+      for (int index = 0; index < partNames.size(); index++) {
+        indexMap.put(partNames.get(index), index);
+      }
+
+      for (Object[] row : partialStatsRows) {
+        String colName = row[COLNAME.idx()].toString();
+        String colType = row[COLTYPE.idx()].toString();
+        BigDecimal countVal = new BigDecimal(row[COUNT_ROWS.idx()].toString());
+
+        // use linear extrapolation. more complicated one can be added in the
+        // future.
+        IExtrapolatePartStatus extrapolateMethod = new 
LinearExtrapolatePartStatus();
+        // fill in colstatus
+        Integer[] index;
+        boolean decimal = false;
+        if (colType.toLowerCase().startsWith("decimal")) {
+          index = IExtrapolatePartStatus.indexMaps.get("decimal");
+          decimal = true;
+        } else {
+          index = IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase());
+        }
+        // if the colType is not the known type, long, double, etc, then get
+        // all index.
+        if (index == null) {
+          index = IExtrapolatePartStatus.indexMaps.get("default");
+        }
+
+        for (int colStatIndex : index) {
+          String colStatName = 
IExtrapolatePartStatus.colStatNames[colStatIndex];
+          // if the aggregation type is sum, we do a scale-up
+          if (
+              IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Sum
+          ) {
+            // +3 only for the case of SUM_NUM_DISTINCTS which is after count 
rows index
+            int rowIndex = (colStatIndex == 15) ? colStatIndex + 3 : 
colStatIndex + 2;
+            if (row[rowIndex] != null) {
+              Long val = MetastoreDirectSqlUtils.extractSqlLong(row[rowIndex]);
+              row[rowIndex] = val / countVal.longValue() * (partNames.size());
+            }
+          } else if (
+              IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Min ||
+                  IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Max
+          ) {
+            // if the aggregation type is min/max, we extrapolate from the
+            // left/right borders
+            String orderByExpr =
+                decimal ? "cast(\"" + colStatName + "\" as decimal)" : "\"" + 
colStatName + "\"";
+
+            queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " + 
PART_COL_STATS
+                + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+                + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = 
" + TBLS + ".\"TBL_ID\""
+                + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS 
+ ".\"DB_ID\""
+                + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
+                + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (%1$s)"
+                + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)"
+                + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
+                + " order by " + orderByExpr;
+
+            Batchable<String, Object[]> columnWisePartitionBatches =
+                columnWisePartitionBatcher(queryText, catName, dbName, 
tableName, partNames, engine, doTrace);
+            try {
+              List<Object[]> list =
+                  Batchable.runBatched(batchSize, 
Collections.singletonList(colName), columnWisePartitionBatches);
+              Object[] min = list.getFirst();
+              Object[] max = list.getLast();
+              if (batchSize > 0) {
+                for (int i = Math.min(batchSize - 1, list.size() - 1); i < 
list.size(); i += batchSize) {
+                  Object[] posMax = list.get(i);
+                  if (new BigDecimal(max[0].toString()).compareTo(new 
BigDecimal(posMax[0].toString())) < 0) {
+                    max = posMax;
+                  }
+                  int j = i + 1;
+                  if (j < list.size()) {
+                    Object[] posMin = list.get(j);
+                    if (new BigDecimal(min[0].toString()).compareTo(new 
BigDecimal(posMin[0].toString())) > 0) {
+                      min = posMin;
+                    }
+                  }
+                }
+              }
+              if (min[0] == null || max[0] == null) {
+                row[2 + colStatIndex] = null;
+              } else {
+                row[2 + colStatIndex] = extrapolateMethod.extrapolate(min, 
max, colStatIndex, indexMap);
+              }
+            } finally {
+              columnWisePartitionBatches.closeAllQueries();
+            }
+          }
+        }
+        colStats.add(prepareCSObjWithAdjustedNDV
+            (row, useDensityFunctionForNDVEstimation, ndvTuner));
+        Deadline.checkTimeout();
+      }
+    }
+    return colStats;
+  }
+
+  private void columnWiseStatsMerger(
+      final String queryText, final String catName, final String dbName,
+      final String tableName, final List<String> colNames, final List<String> 
partNames,
+      final List<ColumnStatisticsObj> colStats, final List<Object[]> 
partialStatsRows, final String engine,
+      final boolean useDensityFunctionForNDVEstimation, final double ndvTuner,
+      final boolean doTrace
+  ) throws MetaException {
+    Batchable<String, Object[]> columnWisePartitionBatches =
+        columnWisePartitionBatcher(queryText, catName, dbName, tableName, 
partNames, engine, doTrace);
+    try {
+      List<Object[]> unmergedColStatslist = Batchable.runBatched(batchSize, 
colNames, columnWisePartitionBatches);
+      Map<String, Object[]> mergedColStatsMap = new HashMap<>();
+      for (Object[] unmergedRow : unmergedColStatslist) {
+        String colName = (String) unmergedRow[0];
+        Object[] mergedRow = mergedColStatsMap.getOrDefault(colName, new 
Object[21]);
+        mergeBackendDBStats(mergedRow, unmergedRow);
+        mergedColStatsMap.put(colName, mergedRow);
+      }
+
+      for (Map.Entry<String, Object[]> entry : mergedColStatsMap.entrySet()) {
+        BigDecimal partCount = new 
BigDecimal(entry.getValue()[COUNT_ROWS.idx()].toString());
+        if (partCount.equals(new BigDecimal(partNames.size())) || 
partCount.longValue() < 2) {
+          colStats.add(
+              prepareCSObjWithAdjustedNDV(entry.getValue(), 
useDensityFunctionForNDVEstimation, ndvTuner));
+        } else {
+          partialStatsRows.add(entry.getValue());
+        }
+        Deadline.checkTimeout();
+      }
+    } finally {
+      columnWisePartitionBatches.closeAllQueries();
+    }
+  }
+
+  private void mergeBackendDBStats(Object[] row1, Object[] row2) {
+    if (row1[COLNAME.idx()] == null) {
+      row1[COLNAME.idx()] = row2[COLNAME.idx()];
+      row1[COLTYPE.idx()] = row2[COLTYPE.idx()];
+    }
+    row1[LONG_LOW_VALUE.idx()] = 
MetastoreDirectSqlUtils.min(row1[LONG_LOW_VALUE.idx()], 
row2[LONG_LOW_VALUE.idx()]);
+    row1[LONG_HIGH_VALUE.idx()] = 
MetastoreDirectSqlUtils.max(row1[LONG_HIGH_VALUE.idx()], 
row2[LONG_HIGH_VALUE.idx()]);
+    row1[DOUBLE_LOW_VALUE.idx()] = 
MetastoreDirectSqlUtils.min(row1[DOUBLE_LOW_VALUE.idx()], 
row2[DOUBLE_LOW_VALUE.idx()]);
+    row1[DOUBLE_HIGH_VALUE.idx()] = 
MetastoreDirectSqlUtils.max(row1[DOUBLE_HIGH_VALUE.idx()], 
row2[DOUBLE_HIGH_VALUE.idx()]);
+    row1[BIG_DECIMAL_LOW_VALUE.idx()] = 
MetastoreDirectSqlUtils.min(row1[BIG_DECIMAL_LOW_VALUE.idx()], 
row2[BIG_DECIMAL_LOW_VALUE.idx()]);
+    row1[BIG_DECIMAL_HIGH_VALUE.idx()] = 
MetastoreDirectSqlUtils.max(row1[BIG_DECIMAL_HIGH_VALUE.idx()], 
row2[BIG_DECIMAL_HIGH_VALUE.idx()]);
+    row1[NUM_NULLS.idx()] = MetastoreDirectSqlUtils.sum(row1[NUM_NULLS.idx()], 
row2[NUM_NULLS.idx()]);
+    row1[NUM_DISTINCTS.idx()] = 
MetastoreDirectSqlUtils.max(row1[NUM_DISTINCTS.idx()], 
row2[NUM_DISTINCTS.idx()]);
+    row1[AVG_COL_LEN.idx()] = 
MetastoreDirectSqlUtils.max(row1[AVG_COL_LEN.idx()], row2[AVG_COL_LEN.idx()]);
+    row1[MAX_COL_LEN.idx()] = 
MetastoreDirectSqlUtils.max(row1[MAX_COL_LEN.idx()], row2[MAX_COL_LEN.idx()]);
+    row1[NUM_TRUES.idx()] = MetastoreDirectSqlUtils.sum(row1[NUM_TRUES.idx()], 
row2[NUM_TRUES.idx()]);
+    row1[NUM_FALSES.idx()] = 
MetastoreDirectSqlUtils.sum(row1[NUM_FALSES.idx()], row2[NUM_FALSES.idx()]);
+    row1[SUM_NDV_LONG.idx()] = 
MetastoreDirectSqlUtils.sum(row1[SUM_NDV_LONG.idx()], row2[SUM_NDV_LONG.idx()]);
+    row1[SUM_NDV_DOUBLE.idx()] = 
MetastoreDirectSqlUtils.sum(row1[SUM_NDV_DOUBLE.idx()], 
row2[SUM_NDV_DOUBLE.idx()]);
+    row1[SUM_NDV_DECIMAL.idx()] = 
MetastoreDirectSqlUtils.sum(row1[SUM_NDV_DECIMAL.idx()], 
row2[SUM_NDV_DECIMAL.idx()]);
+    row1[COUNT_ROWS.idx()] = 
MetastoreDirectSqlUtils.sum(row1[COUNT_ROWS.idx()], row2[COUNT_ROWS.idx()]);
+    row1[SUM_NUM_DISTINCTS.idx()] = 
MetastoreDirectSqlUtils.sum(row1[SUM_NUM_DISTINCTS.idx()], 
row2[SUM_NUM_DISTINCTS.idx()]);
+  }
+
+  private ColumnStatisticsObj prepareCSObjWithAdjustedNDV(
+      Object[] row,
+      boolean useDensityFunctionForNDVEstimation, double ndvTuner)
+      throws MetaException {
+    if (row == null) {
+      return null;
+    }
+    ColumnStatisticsData data = new ColumnStatisticsData();
+    ColumnStatisticsObj cso = new ColumnStatisticsObj((String) 
row[COLNAME.idx()], (String) row[COLTYPE.idx()], data);
+    Object avgLong = MetastoreDirectSqlUtils.divide(row[SUM_NDV_LONG.idx()], 
row[COUNT_ROWS.idx()]);
+    Object avgDouble = 
MetastoreDirectSqlUtils.divide(row[SUM_NDV_DOUBLE.idx()], 
row[COUNT_ROWS.idx()]);
+    Object avgDecimal = 
MetastoreDirectSqlUtils.divide(row[SUM_NDV_DECIMAL.idx()], 
row[COUNT_ROWS.idx()]);
+    StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, 
row[LONG_LOW_VALUE.idx()],
+        row[LONG_HIGH_VALUE.idx()], row[DOUBLE_LOW_VALUE.idx()], 
row[DOUBLE_HIGH_VALUE.idx()], row[BIG_DECIMAL_LOW_VALUE.idx()], 
row[BIG_DECIMAL_HIGH_VALUE.idx()],
+        row[NUM_NULLS.idx()], row[NUM_DISTINCTS.idx()], 
row[AVG_COL_LEN.idx()], row[MAX_COL_LEN.idx()], row[NUM_TRUES.idx()], 
row[NUM_FALSES.idx()],
+        avgLong, avgDouble, avgDecimal, row[SUM_NUM_DISTINCTS.idx()],
+        useDensityFunctionForNDVEstimation, ndvTuner);
+    return cso;
+  }
+
+  private ColumnStatisticsObj prepareCSObj(Object[] row, int i) throws 
MetaException {
+    ColumnStatisticsData data = new ColumnStatisticsData();
+    ColumnStatisticsObj cso = new ColumnStatisticsObj((String)row[i++], 
(String)row[i++], data);
+    Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh = 
row[i++],
+        declow = row[i++], dechigh = row[i++], nulls = row[i++], dist = 
row[i++], bitVector = row[i++],
+        histogram = row[i++], avglen = row[i++], maxlen = row[i++], trues = 
row[i++], falses = row[i];
+    StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data,
+        llow, lhigh, dlow, dhigh, declow, dechigh, nulls, dist, bitVector, 
histogram, avglen, maxlen, trues, falses);
+    return cso;
+  }
+
+  private Batchable<String, Object[]> columnWisePartitionBatcher(
+      final String queryText0, final String catName, final String dbName,
+      final String tableName, final List<String> partNames, final String 
engine,
+      final boolean doTrace) {
+    return new Batchable<String, Object[]>() {
+      @Override
+      public List<Object[]> run(final List<String> inputColNames)
+          throws MetaException {
+        Batchable<String, Object[]> partitionBatchesFetcher = new 
Batchable<String, Object[]>() {
+          @Override
+          public List<Object[]> run(List<String> inputPartNames)
+              throws MetaException {
+            String queryText =
+                String.format(queryText0, makeParams(inputColNames.size()), 
makeParams(inputPartNames.size()));
+            long start = doTrace ? System.nanoTime() : 0;
+            Query<?> query = pm.newQuery("javax.jdo.query.SQL", queryText);
+            try {
+              Object qResult = executeWithArray(query,
+                  prepareParams(catName, dbName, tableName, inputPartNames, 
inputColNames, engine), queryText);
+              long end = doTrace ? System.nanoTime() : 0;
+              MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0, start, 
end);
+              if (qResult == null) {
+                return Collections.emptyList();
+              }
+              return MetastoreDirectSqlUtils.ensureList(qResult);
+            } finally {
+              addQueryAfterUse(query);
+            }
+          }
+        };
+        try {
+          return Batchable.runBatched(batchSize, partNames, 
partitionBatchesFetcher);
+        } finally {
+          addQueryAfterUse(partitionBatchesFetcher);
+        }
+      }
+    };
+  }
+
+  public List<MetaStoreServerUtils.ColStatsObjWithSourceInfo> 
getColStatsForAllTablePartitions(
+      String catName, String dbName,
+      boolean enableBitVector, boolean enableKll) throws MetaException {
+    String queryText = "select \"TBLS\".\"TBL_NAME\", 
\"PARTITIONS\".\"PART_NAME\", "
+        + getStatsList(enableBitVector, enableKll)
+        + " from " + PART_COL_STATS
+        + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+        + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
+        + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
+        + " where " + DBS + ".\"NAME\" = ? and " + DBS + ".\"CTLG_NAME\" = ?";
+    long start = 0;
+    long end = 0;
+    boolean doTrace = LOG.isDebugEnabled();
+    Object qResult = null;
+    start = doTrace ? System.nanoTime() : 0;
+    List<MetaStoreServerUtils.ColStatsObjWithSourceInfo> colStatsForDB = new 
ArrayList<>();
+    try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
+      qResult = executeWithArray(query.getInnerQuery(), new Object[] { dbName, 
catName }, queryText);
+      if (qResult == null) {
+        return colStatsForDB;
+      }
+      end = doTrace ? System.nanoTime() : 0;
+      MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
+      List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
+      for (Object[] row : list) {
+        String tblName = (String) row[0];
+        String partName = (String) row[1];
+        ColumnStatisticsObj colStatObj = prepareCSObj(row, 2);
+        colStatsForDB.add(
+            new MetaStoreServerUtils.ColStatsObjWithSourceInfo(colStatObj, 
catName, dbName, tblName, partName));
+        Deadline.checkTimeout();
+      }
+    }
+    return colStatsForDB;
+  }
+
+  public List<ColumnStatistics> getPartitionStats(
+      final String catName, final String dbName, final String tableName, final 
List<String> partNames,
+      List<String> colNames, String engine, boolean enableBitVector, boolean 
enableKll) throws MetaException {
+    if (colNames.isEmpty() || partNames.isEmpty()) {
+      return Collections.emptyList();
+    }
+    final boolean doTrace = LOG.isDebugEnabled();
+    final String queryText0 = "select \"PARTITIONS\".\"PART_NAME\", " + 
getStatsList(enableBitVector, enableKll)
+        + " from " + PART_COL_STATS
+        + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+        + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
+        + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
+        + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? 
and " + TBLS + ".\"TBL_NAME\" = ? "
+        + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (%1$s)"
+        + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)"
+        + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
+        + " order by " + PARTITIONS +  ".\"PART_NAME\"";
+    Batchable<String, Object[]> b =
+        columnWisePartitionBatcher(queryText0, catName, dbName, tableName, 
partNames, engine, doTrace);
+    List<ColumnStatistics> result = new ArrayList<>(partNames.size());
+    String lastPartName = null;
+    int from = 0;
+    try {
+      List<Object[]> list = Batchable.runBatched(batchSize, colNames, b);
+      for (int i = 0; i <= list.size(); ++i) {
+        boolean isLast = i == list.size();
+        String partName = isLast ? null : (String) list.get(i)[0];
+        if (!isLast && partName.equals(lastPartName)) {
+          continue;
+        } else if (from != i) {
+          ColumnStatisticsDesc csd =
+              new ColumnStatisticsDesc(false, dbName, tableName);
+          csd.setCatName(catName);
+          csd.setPartName(lastPartName);
+          result.add(makeColumnStats(list.subList(from, i), csd, 1, engine));
+        }
+        lastPartName = partName;
+        from = i;
+        Deadline.checkTimeout();
+      }
+    } finally {
+      b.closeAllQueries();
+    }
+    return result;
+  }
+
+  private ColumnStatistics makeColumnStats(
+      List<Object[]> list, ColumnStatisticsDesc csd,
+      int offset, String engine) throws MetaException {
+    ColumnStatistics result = new ColumnStatistics();
+    result.setStatsDesc(csd);
+    List<ColumnStatisticsObj> csos = new ArrayList<>(list.size());
+    for (Object[] row : list) {
+      // LastAnalyzed is stored per column but thrift has it per several;
+      // get the lowest for now as nobody actually uses this field.
+      Object laObj = row[offset + 16]; // 16 is the offset of "last analyzed" 
field
+      if (laObj != null && (!csd.isSetLastAnalyzed() ||
+          csd.getLastAnalyzed() > MetastoreDirectSqlUtils
+              .extractSqlLong(laObj))) {
+        csd.setLastAnalyzed(MetastoreDirectSqlUtils.extractSqlLong(laObj));
+      }
+      csos.add(prepareCSObj(row, offset));
+      Deadline.checkTimeout();
+    }
+    result.setStatsObj(csos);
+    result.setEngine(engine);
+    return result;
+  }
+
+  /**
+   * The common query part for table and partition stats
+   */
+  private String getStatsList(boolean enableBitVector, boolean enableKll) {
+    return "\"COLUMN_NAME\", \"COLUMN_TYPE\", \"LONG_LOW_VALUE\", 
\"LONG_HIGH_VALUE\", "
+        + "\"DOUBLE_LOW_VALUE\", \"DOUBLE_HIGH_VALUE\", 
\"BIG_DECIMAL_LOW_VALUE\", "
+        + "\"BIG_DECIMAL_HIGH_VALUE\", \"NUM_NULLS\", \"NUM_DISTINCTS\", "
+        + (enableBitVector ? "\"BIT_VECTOR\", " : "\'\', ")
+        + (enableKll ? "\"HISTOGRAM\", " : "\'\', ")
+        + "\"AVG_COL_LEN\", \"MAX_COL_LEN\", \"NUM_TRUES\", \"NUM_FALSES\", 
\"LAST_ANALYZED\" ";
+  }
+}
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
index 15924396932..4473067ad09 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
@@ -89,7 +89,7 @@ class DirectSqlUpdatePart {
   private final int maxBatchSize;
   private final SQLGenerator sqlGenerator;
 
-  public DirectSqlUpdatePart(PersistenceManager pm, Configuration conf,
+  DirectSqlUpdatePart(PersistenceManager pm, Configuration conf,
                              DatabaseProduct dbType, int batchSize) {
     this.pm = pm;
     this.conf = conf;
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IExtrapolatePartStatus.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IExtrapolatePartStatus.java
index aa50e91076c..4627eebd3be 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IExtrapolatePartStatus.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IExtrapolatePartStatus.java
@@ -22,6 +22,40 @@
 import java.util.Map;
 
 public interface IExtrapolatePartStatus {
+
+  enum DBStatsAggrIndices {
+    COLNAME(0),
+    COLTYPE(1),
+    LONG_LOW_VALUE(2),
+    LONG_HIGH_VALUE(3),
+    DOUBLE_LOW_VALUE(4),
+    DOUBLE_HIGH_VALUE(5),
+    BIG_DECIMAL_LOW_VALUE(6),
+    BIG_DECIMAL_HIGH_VALUE(7),
+    NUM_NULLS(8),
+    NUM_DISTINCTS(9),
+    AVG_COL_LEN(10),
+    MAX_COL_LEN(11),
+    NUM_TRUES(12),
+    NUM_FALSES(13),
+    SUM_NDV_LONG(14),
+    SUM_NDV_DOUBLE(15),
+    SUM_NDV_DECIMAL(16),
+    COUNT_ROWS(17),
+    SUM_NUM_DISTINCTS(18);
+
+    private final int idx;
+
+    DBStatsAggrIndices(int idx) {
+        this.idx = idx;
+    }
+
+    public int idx() {
+        return idx;
+    }
+
+  }
+
   /**
    * The sequence of colStatNames.
    */
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index 2d22278326a..49d187505dc 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -20,7 +20,6 @@
 
 import static org.apache.commons.lang3.StringUtils.join;
 import static org.apache.commons.lang3.StringUtils.normalizeSpace;
-import static org.apache.commons.lang3.StringUtils.repeat;
 import static org.apache.hadoop.hive.metastore.ColumnType.BIGINT_TYPE_NAME;
 import static org.apache.hadoop.hive.metastore.ColumnType.CHAR_TYPE_NAME;
 import static org.apache.hadoop.hive.metastore.ColumnType.DATE_TYPE_NAME;
@@ -30,6 +29,9 @@
 import static org.apache.hadoop.hive.metastore.ColumnType.TIMESTAMP_TYPE_NAME;
 import static org.apache.hadoop.hive.metastore.ColumnType.TINYINT_TYPE_NAME;
 import static org.apache.hadoop.hive.metastore.ColumnType.VARCHAR_TYPE_NAME;
+import static 
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.getFullyQualifiedName;
+import static 
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.makeParams;
+import static 
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.prepareParams;
 
 import java.sql.Connection;
 import java.sql.SQLException;
@@ -63,8 +65,6 @@
 import org.apache.hadoop.hive.metastore.AggregateStatsCache.AggrColStats;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.DatabaseType;
@@ -115,7 +115,6 @@
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.FileUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
-import 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hive.common.util.BloomFilter;
 import org.datanucleus.store.rdbms.query.ForwardQueryResult;
@@ -167,6 +166,7 @@ class MetaStoreDirectSql {
   private AggregateStatsCache aggrStatsCache;
   private DirectSqlUpdatePart directSqlUpdatePart;
   private DirectSqlInsertPart directSqlInsertPart;
+  private DirectSqlAggrStats directSqlAggrStats;
 
   /**
    * This method returns a comma separated string consisting of String values 
of a given list.
@@ -206,6 +206,7 @@ public MetaStoreDirectSql(PersistenceManager pm, 
Configuration conf, String sche
     this.batchSize = batchSize;
     this.isTxnStatsEnabled = MetastoreConf.getBoolVar(conf, 
ConfVars.HIVE_TXN_STATS_ENABLED);
     this.directSqlUpdatePart = new DirectSqlUpdatePart(pm, conf, dbType, 
batchSize);
+    this.directSqlAggrStats = new DirectSqlAggrStats(pm, conf, schema);
     ImmutableMap.Builder<String, String> fieldNameToTableNameBuilder =
         new ImmutableMap.Builder<>();
 
@@ -267,11 +268,6 @@ public MetaStoreDirectSql(PersistenceManager pm, 
Configuration conf, String sche
             .build();
   }
 
-  private static String getFullyQualifiedName(String schema, String tblName) {
-    return ((schema == null || schema.isEmpty()) ? "" : "\"" + schema + 
"\".\"")
-        + "\"" + tblName + "\"";
-  }
-
   private boolean ensureDbInit() {
     Transaction tx = pm.currentTransaction();
     boolean doCommit = false;
@@ -1612,73 +1608,6 @@ public void visit(LeafNode node) throws MetaException {
     }
   }
 
-  /**
-   * Retrieve the column statistics for the specified columns of the table. 
NULL
-   * is returned if the columns are not provided.
-   * @param catName     the catalog name of the table
-   * @param dbName      the database name of the table
-   * @param tableName   the table name
-   * @param colNames    the list of the column names
-   * @param engine      engine making the request
-   * @return            the column statistics for the specified columns
-   * @throws MetaException
-   */
-  public ColumnStatistics getTableStats(final String catName, final String 
dbName,
-      final String tableName, List<String> colNames, String engine,
-      boolean enableBitVector, boolean enableKll) throws MetaException {
-    if (colNames == null || colNames.isEmpty()) {
-      return null;
-    }
-    final boolean doTrace = LOG.isDebugEnabled();
-    final String queryText0 = "select " + getStatsList(enableBitVector, 
enableKll) + " from " + TAB_COL_STATS
-          + " inner join " + TBLS + " on " + TAB_COL_STATS + ".\"TBL_ID\" = " 
+ TBLS + ".\"TBL_ID\" "
-          + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\" "
-          + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? 
and " + TBLS + ".\"TBL_NAME\" = ?"
-          + " and \"ENGINE\" = ? and \"COLUMN_NAME\" in (";
-    Batchable<String, Object[]> b = new Batchable<String, Object[]>() {
-      @Override
-      public List<Object[]> run(List<String> input) throws MetaException {
-        String queryText = queryText0 + makeParams(input.size()) + ")";
-        Object[] params = new Object[input.size() + 4];
-        params[0] = catName;
-        params[1] = dbName;
-        params[2] = tableName;
-        params[3] = engine;
-        for (int i = 0; i < input.size(); ++i) {
-          params[i + 4] = input.get(i);
-        }
-        long start = doTrace ? System.nanoTime() : 0;
-        Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
-        try {
-          Object qResult = executeWithArray(query, params, queryText);
-          MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0 + "...)", 
start, (doTrace ? System.nanoTime() : 0));
-          if (qResult == null) {
-            return null;
-          }
-          return MetastoreDirectSqlUtils.ensureList(qResult);
-        } finally {
-          addQueryAfterUse(query);
-        }
-      }
-    };
-    List<Object[]> list;
-    try {
-      list = Batchable.runBatched(batchSize, colNames, b);
-      if (list != null) {
-        list = new ArrayList<>(list);
-      }
-    } finally {
-      b.closeAllQueries();
-    }
-
-    if (list == null || list.isEmpty()) {
-      return null;
-    }
-    ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, 
tableName);
-    csd.setCatName(catName);
-    return makeColumnStats(list, csd, 0, engine);
-  }
-
   public List<HiveObjectPrivilege> getTableAllColumnGrants(String catName, 
String dbName,
                                                            String tableName, 
String authorizer) throws MetaException {
     // These constants should match the SELECT clause of the query.
@@ -1796,7 +1725,7 @@ public AggrStats aggrColStatsForPartitions(String 
catName, String dbName, String
           colNamesForDB.add(colName);
           // Read aggregated stats for one column
           colStatsAggrFromDB =
-              columnStatisticsObjForPartitions(catName, dbName, tableName, 
partNames, colNamesForDB, engine,
+              directSqlAggrStats.columnStatisticsObjForPartitions(catName, 
dbName, tableName, partNames, colNamesForDB, engine,
                   partsFound, useDensityFunctionForNDVEstimation, ndvTuner, 
enableBitVector, enableKll);
           if (!colStatsAggrFromDB.isEmpty()) {
             ColumnStatisticsObj colStatsAggr = colStatsAggrFromDB.get(0);
@@ -1809,7 +1738,7 @@ public AggrStats aggrColStatsForPartitions(String 
catName, String dbName, String
     } else {
       partsFound = partsFoundForPartitions(catName, dbName, tableName, 
partNames, colNames, engine);
       colStatsList =
-          columnStatisticsObjForPartitions(catName, dbName, tableName, 
partNames, colNames, engine, partsFound,
+          directSqlAggrStats.columnStatisticsObjForPartitions(catName, dbName, 
tableName, partNames, colNames, engine, partsFound,
               useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector, 
enableKll);
     }
     LOG.debug("useDensityFunctionForNDVEstimation = " + 
useDensityFunctionForNDVEstimation
@@ -1876,540 +1805,6 @@ public List<Long> run(List<String> inputPartNames) 
throws MetaException {
     return partsFound;
   }
 
-  private List<ColumnStatisticsObj> columnStatisticsObjForPartitions(
-      final String catName, final String dbName, final String tableName, final 
List<String> partNames,
-      List<String> colNames, String engine, long partsFound, final boolean 
useDensityFunctionForNDVEstimation,
-      final double ndvTuner, final boolean enableBitVector, boolean enableKll) 
throws MetaException {
-    final boolean areAllPartsFound = (partsFound == partNames.size());
-    return Batchable.runBatched(batchSize, colNames, new Batchable<String, 
ColumnStatisticsObj>() {
-      @Override
-      public List<ColumnStatisticsObj> run(final List<String> inputColNames) 
throws MetaException {
-        return Batchable.runBatched(batchSize, partNames, new 
Batchable<String, ColumnStatisticsObj>() {
-          @Override
-          public List<ColumnStatisticsObj> run(List<String> inputPartNames) 
throws MetaException {
-            return columnStatisticsObjForPartitionsBatch(catName, dbName, 
tableName, inputPartNames,
-                inputColNames, engine, areAllPartsFound, 
useDensityFunctionForNDVEstimation, ndvTuner,
-                enableBitVector, enableKll);
-          }
-        });
-      }
-    });
-  }
-
-  public List<ColStatsObjWithSourceInfo> 
getColStatsForAllTablePartitions(String catName, String dbName,
-      boolean enableBitVector, boolean enableKll) throws MetaException {
-    String queryText = "select \"TBLS\".\"TBL_NAME\", 
\"PARTITIONS\".\"PART_NAME\", "
-        + getStatsList(enableBitVector, enableKll)
-        + " from " + PART_COL_STATS
-        + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-        + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
-        + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
-        + " where " + DBS + ".\"NAME\" = ? and " + DBS + ".\"CTLG_NAME\" = ?";
-    long start = 0;
-    long end = 0;
-    boolean doTrace = LOG.isDebugEnabled();
-    Object qResult = null;
-    start = doTrace ? System.nanoTime() : 0;
-    List<ColStatsObjWithSourceInfo> colStatsForDB = new 
ArrayList<ColStatsObjWithSourceInfo>();
-    try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-      qResult = executeWithArray(query.getInnerQuery(), new Object[] { dbName, 
catName }, queryText);
-      if (qResult == null) {
-        return colStatsForDB;
-      }
-      end = doTrace ? System.nanoTime() : 0;
-      MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
-      List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
-      for (Object[] row : list) {
-        String tblName = (String) row[0];
-        String partName = (String) row[1];
-        ColumnStatisticsObj colStatObj = prepareCSObj(row, 2);
-        colStatsForDB.add(new ColStatsObjWithSourceInfo(colStatObj, catName, 
dbName, tblName, partName));
-        Deadline.checkTimeout();
-      }
-    }
-    return colStatsForDB;
-  }
-
-  /** Should be called with the list short enough to not trip up Oracle/etc. */
-  private List<ColumnStatisticsObj> 
columnStatisticsObjForPartitionsBatch(String catName, String dbName,
-      String tableName, List<String> partNames, List<String> colNames, String 
engine,
-      boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, 
double ndvTuner,
-      boolean enableBitVector, boolean enableKll)
-      throws MetaException {
-    if (enableBitVector || enableKll) {
-      return aggrStatsUseJava(catName, dbName, tableName, partNames, colNames, 
engine, areAllPartsFound,
-          useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector, 
enableKll);
-    } else {
-      return aggrStatsUseDB(catName, dbName, tableName, partNames, colNames, 
engine, areAllPartsFound,
-          useDensityFunctionForNDVEstimation, ndvTuner);
-    }
-  }
-
-  private List<ColumnStatisticsObj> aggrStatsUseJava(String catName, String 
dbName, String tableName,
-      List<String> partNames, List<String> colNames, String engine, boolean 
areAllPartsFound,
-      boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean 
enableBitVector,
-      boolean enableKll) throws MetaException {
-    // 1. get all the stats for colNames in partNames;
-    List<ColumnStatistics> partStats =
-        getPartitionStats(catName, dbName, tableName, partNames, colNames, 
engine, enableBitVector, enableKll);
-    // 2. use util function to aggr stats
-    return MetaStoreServerUtils.aggrPartitionStats(partStats, catName, dbName, 
tableName, partNames, colNames,
-        areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner);
-  }
-
-  private List<ColumnStatisticsObj> aggrStatsUseDB(String catName, String 
dbName,
-      String tableName, List<String> partNames, List<String> colNames, String 
engine,
-      boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, 
double ndvTuner) throws MetaException {
-    // TODO: all the extrapolation logic should be moved out of this class,
-    // only mechanical data retrieval should remain here.
-    String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "
-        + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), 
min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
-        + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), 
max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
-        + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
-        + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), 
sum(\"NUM_FALSES\"), "
-        // The following data is used to compute a partitioned table's NDV 
based
-        // on partitions' NDV when useDensityFunctionForNDVEstimation = true. 
Global NDVs cannot be
-        // accurately derived from partition NDVs, because the domain of 
column value two partitions
-        // can overlap. If there is no overlap then global NDV is just the sum
-        // of partition NDVs (UpperBound). But if there is some overlay then
-        // global NDV can be anywhere between sum of partition NDVs (no 
overlap)
-        // and same as one of the partition NDV (domain of column value in all 
other
-        // partitions is subset of the domain value in one of the partition)
-        // (LowerBound).But under uniform distribution, we can roughly 
estimate the global
-        // NDV by leveraging the min/max values.
-        // And, we also guarantee that the estimation makes sense by comparing 
it to the
-        // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
-        // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
-        + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" 
as decimal)),"
-        + 
"avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
-        + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
-        + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + ""
-        + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-        + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
-        + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
-        + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? 
and " + TBLS + ".\"TBL_NAME\" = ? ";
-    String queryText = null;
-    long start = 0;
-    long end = 0;
-
-    boolean doTrace = LOG.isDebugEnabled();
-    ForwardQueryResult<?> fqr = null;
-    // Check if the status of all the columns of all the partitions exists
-    // Extrapolation is not needed.
-    if (areAllPartsFound) {
-      queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + 
makeParams(colNames.size()) + ")"
-          + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-          + " and \"ENGINE\" = ? "
-          + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
-      start = doTrace ? System.nanoTime() : 0;
-      try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-        Object qResult = executeWithArray(query.getInnerQuery(),
-            prepareParams(catName, dbName, tableName, partNames, colNames,
-                engine), queryText);
-        if (qResult == null) {
-          return Collections.emptyList();
-        }
-        end = doTrace ? System.nanoTime() : 0;
-        MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
-        List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
-        List<ColumnStatisticsObj> colStats =
-            new ArrayList<ColumnStatisticsObj>(list.size());
-        for (Object[] row : list) {
-          colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
-              useDensityFunctionForNDVEstimation, ndvTuner));
-          Deadline.checkTimeout();
-        }
-        return colStats;
-      }
-    } else {
-      // Extrapolation is needed for some columns.
-      // In this case, at least a column status for a partition is missing.
-      // We need to extrapolate this partition based on the other partitions
-      List<ColumnStatisticsObj> colStats = new 
ArrayList<ColumnStatisticsObj>(colNames.size());
-      queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", 
count(\"PART_COL_STATS\".\"PART_ID\") "
-          + " from " + PART_COL_STATS
-          + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-          + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
-          + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
-          + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? 
and " + TBLS + ".\"TBL_NAME\" = ? "
-          + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + 
makeParams(colNames.size()) + ")"
-          + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-          + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-          + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\", " + 
PART_COL_STATS + ".\"COLUMN_TYPE\"";
-      start = doTrace ? System.nanoTime() : 0;
-      List<String> noExtraColumnNames = new ArrayList<String>();
-      Map<String, String[]> extraColumnNameTypeParts = new HashMap<String, 
String[]>();
-      try(QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-        Object qResult = executeWithArray(query.getInnerQuery(),
-            prepareParams(catName, dbName, tableName, partNames, colNames,
-                engine), queryText);
-        end = doTrace ? System.nanoTime() : 0;
-        MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
-        if (qResult == null) {
-          return Collections.emptyList();
-        }
-
-        List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
-        for (Object[] row : list) {
-          String colName = (String) row[0];
-          String colType = (String) row[1];
-          // Extrapolation is not needed for this column if
-          // count(\"PARTITION_NAME\")==partNames.size()
-          // Or, extrapolation is not possible for this column if
-          // count(\"PARTITION_NAME\")<2
-          Long count = MetastoreDirectSqlUtils.extractSqlLong(row[2]);
-          if (count == partNames.size() || count < 2) {
-            noExtraColumnNames.add(colName);
-          } else {
-            extraColumnNameTypeParts.put(colName, new String[] {colType, 
String.valueOf(count)});
-          }
-          Deadline.checkTimeout();
-        }
-      }
-      // Extrapolation is not needed for columns noExtraColumnNames
-      List<Object[]> list;
-      if (noExtraColumnNames.size() != 0) {
-        queryText = commonPrefix + " and \"COLUMN_NAME\" in ("
-            + makeParams(noExtraColumnNames.size()) + ")" + " and 
\"PARTITION_NAME\" in ("
-            + makeParams(partNames.size()) + ")"
-            + " and \"ENGINE\" = ? "
-            + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
-        start = doTrace ? System.nanoTime() : 0;
-
-        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-          Object qResult = executeWithArray(query.getInnerQuery(),
-              prepareParams(catName, dbName, tableName, partNames, 
noExtraColumnNames, engine), queryText);
-          if (qResult == null) {
-            return Collections.emptyList();
-          }
-          list = MetastoreDirectSqlUtils.ensureList(qResult);
-          for (Object[] row : list) {
-            colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
-                useDensityFunctionForNDVEstimation, ndvTuner));
-            Deadline.checkTimeout();
-          }
-          end = doTrace ? System.nanoTime() : 0;
-          MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
-        }
-      }
-      // Extrapolation is needed for extraColumnNames.
-      // give a sequence number for all the partitions
-      if (extraColumnNameTypeParts.size() != 0) {
-        Map<String, Integer> indexMap = new HashMap<String, Integer>();
-        for (int index = 0; index < partNames.size(); index++) {
-          indexMap.put(partNames.get(index), index);
-        }
-        // get sum for all columns to reduce the number of queries
-        Map<String, Map<Integer, Object>> sumMap = new HashMap<String, 
Map<Integer, Object>>();
-        queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), 
sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")"
-            + " from " + PART_COL_STATS
-            + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-            + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
-            + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
-            + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = 
? and " + TBLS + ".\"TBL_NAME\" = ? "
-            + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + 
makeParams(extraColumnNameTypeParts.size()) + ")"
-            + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-            + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-            + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\"";
-        start = doTrace ? System.nanoTime() : 0;
-        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-          List<String> extraColumnNames = new ArrayList<String>();
-          extraColumnNames.addAll(extraColumnNameTypeParts.keySet());
-          Object qResult = executeWithArray(query.getInnerQuery(),
-              prepareParams(catName, dbName, tableName, partNames,
-                  extraColumnNames, engine), queryText);
-          if (qResult == null) {
-            return Collections.emptyList();
-          }
-          list = MetastoreDirectSqlUtils.ensureList(qResult);
-          // see the indexes for colstats in IExtrapolatePartStatus
-          Integer[] sumIndex = new Integer[] {6, 10, 11, 15};
-          for (Object[] row : list) {
-            Map<Integer, Object> indexToObject = new HashMap<Integer, 
Object>();
-            for (int ind = 1; ind < row.length; ind++) {
-              indexToObject.put(sumIndex[ind - 1], row[ind]);
-            }
-            // row[0] is the column name
-            sumMap.put((String) row[0], indexToObject);
-            Deadline.checkTimeout();
-          }
-          end = doTrace ? System.nanoTime() : 0;
-          MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
-        }
-        for (Map.Entry<String, String[]> entry : 
extraColumnNameTypeParts.entrySet()) {
-          Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length 
+ 2];
-          String colName = entry.getKey();
-          String colType = entry.getValue()[0];
-          Long sumVal = Long.parseLong(entry.getValue()[1]);
-          // fill in colname
-          row[0] = colName;
-          // fill in coltype
-          row[1] = colType;
-          // use linear extrapolation. more complicated one can be added in the
-          // future.
-          IExtrapolatePartStatus extrapolateMethod = new 
LinearExtrapolatePartStatus();
-          // fill in colstatus
-          Integer[] index = null;
-          boolean decimal = false;
-          if (colType.toLowerCase().startsWith("decimal")) {
-            index = IExtrapolatePartStatus.indexMaps.get("decimal");
-            decimal = true;
-          } else {
-            index = 
IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase());
-          }
-          // if the colType is not the known type, long, double, etc, then get
-          // all index.
-          if (index == null) {
-            index = IExtrapolatePartStatus.indexMaps.get("default");
-          }
-          for (int colStatIndex : index) {
-            String colStatName = 
IExtrapolatePartStatus.colStatNames[colStatIndex];
-            // if the aggregation type is sum, we do a scale-up
-            if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Sum) {
-              Object o = sumMap.get(colName).get(colStatIndex);
-              if (o == null) {
-                row[2 + colStatIndex] = null;
-              } else {
-                Long val = MetastoreDirectSqlUtils.extractSqlLong(o);
-                row[2 + colStatIndex] = val / sumVal * (partNames.size());
-              }
-            } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Min
-                || IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Max) {
-              // if the aggregation type is min/max, we extrapolate from the
-              // left/right borders
-              if (!decimal) {
-                queryText = "select \"" + colStatName + "\",\"PART_NAME\" from 
" + PART_COL_STATS
-                    + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-                    + " inner join " + TBLS + " on " + PARTITIONS + 
".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
-                    + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
-                    + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                    + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                    + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-                    + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                    + " order by \"" + colStatName + "\"";
-              } else {
-                queryText = "select \"" + colStatName + "\",\"PART_NAME\" from 
" + PART_COL_STATS
-                    + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-                    + " inner join " + TBLS + " on " + PARTITIONS + 
".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
-                    + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
-                    + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                    + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                    + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-                    + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                    + " order by cast(\"" + colStatName + "\" as decimal)";
-              }
-              start = doTrace ? System.nanoTime() : 0;
-              try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-                Object qResult = executeWithArray(query.getInnerQuery(),
-                    prepareParams(catName, dbName, tableName, partNames, 
Arrays.asList(colName), engine), queryText);
-                if (qResult == null) {
-                  return Collections.emptyList();
-                }
-                fqr = (ForwardQueryResult<?>) qResult;
-                Object[] min = (Object[]) (fqr.get(0));
-                Object[] max = (Object[]) (fqr.get(fqr.size() - 1));
-                end = doTrace ? System.nanoTime() : 0;
-                MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, 
end);
-                if (min[0] == null || max[0] == null) {
-                  row[2 + colStatIndex] = null;
-                } else {
-                  row[2 + colStatIndex] = extrapolateMethod
-                      .extrapolate(min, max, colStatIndex, indexMap);
-                }
-              }
-            } else {
-              // if the aggregation type is avg, we use the average on the 
existing ones.
-              queryText = "select "
-                  + 
"avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as 
decimal)),"
-                  + 
"avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
-                  + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")"
-                  + " from " + PART_COL_STATS + ""
-                  + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-                  + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" 
= " + TBLS + ".\"TBL_ID\""
-                  + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
-                  + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                  + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                  + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-                  + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                  + " group by \"COLUMN_NAME\"";
-              start = doTrace ? System.nanoTime() : 0;
-              try(QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-                Object qResult = executeWithArray(query.getInnerQuery(),
-                    prepareParams(catName, dbName, tableName, partNames, 
Arrays.asList(colName), engine), queryText);
-                if (qResult == null) {
-                  return Collections.emptyList();
-                }
-                fqr = (ForwardQueryResult<?>) qResult;
-                Object[] avg = (Object[]) (fqr.get(0));
-                // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE",
-                // "AVG_DECIMAL"
-                row[2 + colStatIndex] = avg[colStatIndex - 12];
-                end = doTrace ? System.nanoTime() : 0;
-                MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, 
end);
-              }
-            }
-          }
-          colStats.add(prepareCSObjWithAdjustedNDV(row, 0, 
useDensityFunctionForNDVEstimation, ndvTuner));
-          Deadline.checkTimeout();
-        }
-      }
-      return colStats;
-    }
-  }
-
-  private ColumnStatisticsObj prepareCSObj (Object[] row, int i) throws 
MetaException {
-    ColumnStatisticsData data = new ColumnStatisticsData();
-    ColumnStatisticsObj cso = new ColumnStatisticsObj((String)row[i++], 
(String)row[i++], data);
-    Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh = 
row[i++],
-        declow = row[i++], dechigh = row[i++], nulls = row[i++], dist = 
row[i++], bitVector = row[i++],
-        histogram = row[i++], avglen = row[i++], maxlen = row[i++], trues = 
row[i++], falses = row[i];
-    StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data,
-        llow, lhigh, dlow, dhigh, declow, dechigh, nulls, dist, bitVector, 
histogram, avglen, maxlen, trues, falses);
-    return cso;
-  }
-
-  private ColumnStatisticsObj prepareCSObjWithAdjustedNDV(Object[] row, int i,
-      boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws 
MetaException {
-    ColumnStatisticsData data = new ColumnStatisticsData();
-    ColumnStatisticsObj cso = new ColumnStatisticsObj((String) row[i++], 
(String) row[i++], data);
-    Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh = 
row[i++], declow = row[i++],
-        dechigh = row[i++], nulls = row[i++], dist = row[i++], avglen = 
row[i++], maxlen = row[i++],
-        trues = row[i++], falses = row[i++], avgLong = row[i++], avgDouble = 
row[i++],
-        avgDecimal = row[i++], sumDist = row[i++];
-    StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, llow, 
lhigh, dlow, dhigh,
-        declow, dechigh, nulls, dist, avglen, maxlen, trues, falses, avgLong, 
avgDouble,
-        avgDecimal, sumDist, useDensityFunctionForNDVEstimation, ndvTuner);
-    return cso;
-  }
-
-  private Object[] prepareParams(String catName, String dbName, String 
tableName,
-      List<String> partNames, List<String> colNames, String engine) throws 
MetaException {
-    Object[] params = new Object[colNames.size() + partNames.size() + 4];
-    int paramI = 0;
-    params[paramI++] = catName;
-    params[paramI++] = dbName;
-    params[paramI++] = tableName;
-    for (String colName : colNames) {
-      params[paramI++] = colName;
-    }
-    for (String partName : partNames) {
-      params[paramI++] = partName;
-    }
-    params[paramI++] = engine;
-
-    return params;
-  }
-
-  public List<ColumnStatistics> getPartitionStats(
-      final String catName, final String dbName, final String tableName, final 
List<String> partNames,
-      List<String> colNames, String engine, boolean enableBitVector, boolean 
enableKll) throws MetaException {
-    if (colNames.isEmpty() || partNames.isEmpty()) {
-      return Collections.emptyList();
-    }
-    final boolean doTrace = LOG.isDebugEnabled();
-    final String queryText0 = "select \"PARTITIONS\".\"PART_NAME\", " + 
getStatsList(enableBitVector, enableKll)
-        + " from " + PART_COL_STATS
-        + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-        + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
-        + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
-        + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? 
and " + TBLS + ".\"TBL_NAME\" = ? "
-        + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (%1$s)"
-        + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)"
-        + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-        + " order by " + PARTITIONS +  ".\"PART_NAME\"";
-    Batchable<String, Object[]> b = new Batchable<String, Object[]>() {
-      @Override
-      public List<Object[]> run(final List<String> inputColNames) throws 
MetaException {
-        Batchable<String, Object[]> b2 = new Batchable<String, Object[]>() {
-          @Override
-          public List<Object[]> run(List<String> inputPartNames) throws 
MetaException {
-            String queryText = String.format(queryText0,
-                makeParams(inputColNames.size()), 
makeParams(inputPartNames.size()));
-            long start = doTrace ? System.nanoTime() : 0;
-            Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
-            try {
-              Object qResult = executeWithArray(query, prepareParams(
-                  catName, dbName, tableName, inputPartNames, inputColNames, 
engine), queryText);
-              MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0, start, 
(doTrace ? System.nanoTime() : 0));
-              if (qResult == null) {
-                return Collections.emptyList();
-              }
-              return MetastoreDirectSqlUtils.ensureList(qResult);
-            } finally {
-              addQueryAfterUse(query);
-            }
-          }
-        };
-        try {
-          return Batchable.runBatched(batchSize, partNames, b2);
-        } finally {
-          addQueryAfterUse(b2);
-        }
-      }
-    };
-
-    List<ColumnStatistics> result = new 
ArrayList<ColumnStatistics>(partNames.size());
-    String lastPartName = null;
-    int from = 0;
-    try {
-      List<Object[]> list = Batchable.runBatched(batchSize, colNames, b);
-      for (int i = 0; i <= list.size(); ++i) {
-        boolean isLast = i == list.size();
-        String partName = isLast ? null : (String) list.get(i)[0];
-        if (!isLast && partName.equals(lastPartName)) {
-          continue;
-        } else if (from != i) {
-          ColumnStatisticsDesc csd =
-              new ColumnStatisticsDesc(false, dbName, tableName);
-          csd.setCatName(catName);
-          csd.setPartName(lastPartName);
-          result.add(makeColumnStats(list.subList(from, i), csd, 1, engine));
-        }
-        lastPartName = partName;
-        from = i;
-        Deadline.checkTimeout();
-      }
-    } finally {
-      b.closeAllQueries();
-    }
-    return result;
-  }
-
-  /** The common query part for table and partition stats */
-  private String getStatsList(boolean enableBitVector, boolean enableKll) {
-    return "\"COLUMN_NAME\", \"COLUMN_TYPE\", \"LONG_LOW_VALUE\", 
\"LONG_HIGH_VALUE\", "
-        + "\"DOUBLE_LOW_VALUE\", \"DOUBLE_HIGH_VALUE\", 
\"BIG_DECIMAL_LOW_VALUE\", "
-        + "\"BIG_DECIMAL_HIGH_VALUE\", \"NUM_NULLS\", \"NUM_DISTINCTS\", "
-        + (enableBitVector ? "\"BIT_VECTOR\", " : "\'\', ")
-        + (enableKll ? "\"HISTOGRAM\", " : "\'\', ")
-        + "\"AVG_COL_LEN\", \"MAX_COL_LEN\", \"NUM_TRUES\", \"NUM_FALSES\", 
\"LAST_ANALYZED\" ";
-  }
-
-  private ColumnStatistics makeColumnStats(
-      List<Object[]> list, ColumnStatisticsDesc csd, int offset, String 
engine) throws MetaException {
-    ColumnStatistics result = new ColumnStatistics();
-    result.setStatsDesc(csd);
-    List<ColumnStatisticsObj> csos = new 
ArrayList<ColumnStatisticsObj>(list.size());
-    for (Object[] row : list) {
-      // LastAnalyzed is stored per column but thrift has it per several;
-      // get the lowest for now as nobody actually uses this field.
-      Object laObj = row[offset + 16]; // 16 is the offset of "last analyzed" 
field
-      if (laObj != null && (!csd.isSetLastAnalyzed() || csd.getLastAnalyzed() 
> MetastoreDirectSqlUtils
-          .extractSqlLong(laObj))) {
-        csd.setLastAnalyzed(MetastoreDirectSqlUtils.extractSqlLong(laObj));
-      }
-      csos.add(prepareCSObj(row, offset));
-      Deadline.checkTimeout();
-    }
-    result.setStatsObj(csos);
-    result.setEngine(engine);
-    return result;
-  }
-
-  private String makeParams(int size) {
-    // W/ size 0, query will fail, but at least we'd get to see the query in 
debug output.
-    return (size == 0) ? "" : repeat(",?", size).substring(1);
-  }
 
   @SuppressWarnings("unchecked")
   private <T> T executeWithArray(Query query, Object[] params, String sql) 
throws MetaException {
@@ -2838,9 +2233,10 @@ public List<SQLCheckConstraint> 
getCheckConstraints(String catName, String db_na
 
   /**
    * Drop partitions by using direct SQL queries.
-   * @param catName Metastore catalog name.
-   * @param dbName Metastore db name.
-   * @param tblName Metastore table name.
+   *
+   * @param catName   Metastore catalog name.
+   * @param dbName    Metastore db name.
+   * @param tblName   Metastore table name.
    * @param partNames Partition names to get.
    * @return List of partitions.
    */
@@ -2923,6 +2319,7 @@ public List<Void> run(List<Long> input) throws Exception {
 
   /**
    * Drops Partition-s. Should be called with the list short enough to not 
trip up Oracle/etc.
+   *
    * @param partitionIdList The partition identifiers to drop
    * @throws MetaException If there is an SQL exception during the execution 
it converted to
    * MetaException
@@ -3007,6 +2404,7 @@ private void dropPartitionsByPartitionIds(List<Long> 
partitionIdList) throws Met
 
   /**
    * Drops SD-s. Should be called with the list short enough to not trip up 
Oracle/etc.
+   *
    * @param storageDescriptorIdList The storage descriptor identifiers to drop
    * @throws MetaException If there is an SQL exception during the execution 
it converted to
    * MetaException
@@ -3095,6 +2493,7 @@ private void dropStorageDescriptors(List<Object> 
storageDescriptorIdList) throws
 
   /**
    * Drops Serde-s. Should be called with the list short enough to not trip up 
Oracle/etc.
+   *
    * @param serdeIdList The serde identifiers to drop
    * @throws MetaException If there is an SQL exception during the execution 
it converted to
    * MetaException
@@ -3124,6 +2523,7 @@ private void dropSerdes(List<Object> serdeIdList) throws 
MetaException {
   /**
    * Checks if the column descriptors still has references for other SD-s. If 
not, then removes
    * them. Should be called with the list short enough to not trip up 
Oracle/etc.
+   *
    * @param columnDescriptorIdList The column identifiers
    * @throws MetaException If there is an SQL exception during the execution 
it converted to
    * MetaException
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
index 45e89ab40df..19b92f5177c 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
@@ -42,6 +42,7 @@
 import javax.jdo.PersistenceManager;
 import javax.jdo.Query;
 import java.math.BigDecimal;
+import java.math.MathContext;
 import java.sql.Blob;
 import java.sql.Clob;
 import java.sql.SQLException;
@@ -53,6 +54,8 @@
 import java.util.Map;
 import java.util.TreeMap;
 
+import static org.apache.commons.lang3.StringUtils.repeat;
+
 /**
  * Helper utilities used by DirectSQL code in HiveMetastore.
  */
@@ -646,4 +649,72 @@ public static void throwMetaOrRuntimeException(Exception 
e) throws MetaException
       throw new RuntimeException(e);
     }
   }
+
+  public static Object[] prepareParams(String catName, String dbName, String 
tableName,
+                                 List<String> partNames, List<String> 
colNames, String engine) {
+    Object[] params = new Object[colNames.size() + partNames.size() + 4];
+    int paramI = 0;
+    params[paramI++] = catName;
+    params[paramI++] = dbName;
+    params[paramI++] = tableName;
+    for (String colName : colNames) {
+      params[paramI++] = colName;
+    }
+    for (String partName : partNames) {
+      params[paramI++] = partName;
+    }
+    params[paramI] = engine;
+
+    return params;
+  }
+
+  public static String getFullyQualifiedName(String schema, String tblName) {
+    return ((schema == null || schema.isEmpty()) ? "" : "\"" + schema + 
"\".\"")
+            + "\"" + tblName + "\"";
+  }
+
+  public static String makeParams(int size) {
+    // W/ size 0, query will fail, but at least we'd get to see the query in 
debug output.
+    return (size == 0) ? "" : repeat(",?", size).substring(1);
+  }
+
+  public static Object sum(Object first, Object second) {
+    if (first == null) {
+      return second;
+    }
+    if (second == null) {
+      return first;
+    }
+    return (new BigDecimal(first.toString())).add(new 
BigDecimal(second.toString()));
+  }
+
+  public static Object divide(Object dividend, Object divisor) {
+    if (dividend == null || divisor == null) {
+      return null;
+    }
+
+    BigDecimal divisorVal = new BigDecimal(divisor.toString());
+    if (divisorVal.equals(new BigDecimal("0"))) return null;
+    return (new BigDecimal(dividend.toString())).divide(divisorVal, 
MathContext.DECIMAL64);
+  }
+
+  public static Object min(Object first, Object second) {
+    if (first == null) {
+      return second;
+    }
+    if (second == null) {
+      return first;
+    }
+    return (new BigDecimal(first.toString())).min(new 
BigDecimal(second.toString()));
+  }
+
+  public static Object max(Object first, Object second) {
+    if (first == null) {
+      return second;
+    }
+    if (second == null) {
+      return first;
+    }
+    return (new BigDecimal(first.toString())).max(new 
BigDecimal(second.toString()));
+  }
 }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 3820c8c500e..8a2c94a6443 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -327,6 +327,7 @@ private enum TXN_STATUS {
   protected PersistenceManager pm = null;
   protected SQLGenerator sqlGenerator = null;
   private MetaStoreDirectSql directSql = null;
+  private DirectSqlAggrStats directSqlAggrStats;
   protected DatabaseProduct dbType = null;
   private PartitionExpressionProxy expressionProxy = null;
   protected Configuration conf;
@@ -365,6 +366,7 @@ public void setConf(Configuration conf) {
     // most recent instance of the pmf
     pm = null;
     directSql = null;
+    directSqlAggrStats = null;
     expressionProxy = null;
     openTrasactionCalls = 0;
     currentTransaction = null;
@@ -407,6 +409,7 @@ private void initialize() {
         String schema = 
PersistenceManagerProvider.getProperty("javax.jdo.mapping.Schema");
         schema = org.apache.commons.lang3.StringUtils.defaultIfBlank(schema, 
null);
         directSql = new MetaStoreDirectSql(pm, conf, schema);
+        directSqlAggrStats = new DirectSqlAggrStats(pm,conf,schema);
       }
     }
     if (propertyStore == null) {
@@ -9569,7 +9572,7 @@ protected ColumnStatistics 
getTableColumnStatisticsInternal(
         normalizeIdentifier(tableName), allowSql, allowJdo, null) {
       @Override
       protected ColumnStatistics getSqlResult(GetHelper<ColumnStatistics> ctx) 
throws MetaException {
-        return directSql.getTableStats(catName, dbName, tblName, colNames, 
engine, enableBitVector, enableKll);
+        return directSqlAggrStats.getTableStats(catName, dbName, tblName, 
colNames, engine, enableBitVector, enableKll);
       }
 
       @Override
@@ -9693,7 +9696,7 @@ protected List<ColumnStatistics> 
getPartitionColumnStatisticsInternal(
       @Override
       protected List<ColumnStatistics> getSqlResult(
           GetHelper<List<ColumnStatistics>> ctx) throws MetaException {
-        return directSql.getPartitionStats(
+        return directSqlAggrStats.getPartitionStats(
             catName, dbName, tblName, partNames, colNames, engine, 
enableBitVector, enableKll);
       }
       @Override
@@ -9811,7 +9814,7 @@ public 
List<MetaStoreServerUtils.ColStatsObjWithSourceInfo> getPartitionColStats
       @Override
       protected List<MetaStoreServerUtils.ColStatsObjWithSourceInfo> 
getSqlResult(
           GetHelper<List<MetaStoreServerUtils.ColStatsObjWithSourceInfo>> ctx) 
throws MetaException {
-        return directSql.getColStatsForAllTablePartitions(catName, dbName, 
enableBitVector, enableKll);
+        return directSqlAggrStats.getColStatsForAllTablePartitions(catName, 
dbName, enableBitVector, enableKll);
       }
 
       @Override
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
index f755aec6b19..0d22719dd6a 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
@@ -106,6 +106,8 @@
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
@@ -776,7 +778,7 @@ public void testConcurrentDropPartitions() throws 
MetaException, InvalidObjectEx
   @Test
   public void testDirectSQLDropPartitionsCacheInSession()
       throws Exception {
-    createPartitionedTable(false, false);
+    createPartitionedTable(false, false, new HashSet<>());
     // query the partitions with JDO
     List<Partition> partitions;
     try(AutoCloseable c = deadline()) {
@@ -807,7 +809,7 @@ public void testDirectSQLDropPartitionsCacheCrossSession()
     ObjectStore objectStore2 = new ObjectStore();
     objectStore2.setConf(conf);
 
-    createPartitionedTable(false, false);
+    createPartitionedTable(false, false, new HashSet<>());
     GetPartitionsArgs args = new 
GetPartitionsArgs.GetPartitionsArgsBuilder().max(10).build();
     // query the partitions with JDO in the 1st session
     List<Partition> partitions;
@@ -842,7 +844,7 @@ public void testDirectSQLDropPartitionsCacheCrossSession()
   @Test
   public void testDirectSQLDropPartitionsCleanup() throws Exception {
 
-    createPartitionedTable(true, true);
+    createPartitionedTable(true, true, new HashSet<>());
 
     // Check, that every table in the expected state before the drop
     checkBackendTableSize("PARTITIONS", 3);
@@ -883,7 +885,7 @@ public void testDirectSQLDropPartitionsCleanup() throws 
Exception {
 
   @Test
   public void testDirectSQLCDsCleanup() throws Exception {
-    createPartitionedTable(true, true);
+    createPartitionedTable(true, true, new HashSet<>());
     // Checks there is only one CD before altering partition
     checkBackendTableSize("PARTITIONS", 3);
     checkBackendTableSize("CDS", 1);
@@ -915,7 +917,7 @@ public void testDirectSQLCDsCleanup() throws Exception {
 
   @Test
   public void testTableStatisticsOps() throws Exception {
-    createPartitionedTable(true, true);
+    createPartitionedTable(true, true, new HashSet<>());
 
     List<ColumnStatistics> tabColStats;
     try (AutoCloseable c = deadline()) {
@@ -958,13 +960,13 @@ public void testTableStatisticsOps() throws Exception {
 
   @Test
   public void testDeleteTableColumnStatisticsWhenEngineHasSpecialCharacter() 
throws Exception {
-    createPartitionedTable(true, true);
+    createPartitionedTable(true, true, new HashSet<>());
     objectStore.deleteTableColumnStatistics(DEFAULT_CATALOG_NAME, DB1, TABLE1, 
Arrays.asList("test_col1"), "special '");
   }
 
   @Test
   public void testPartitionStatisticsOps() throws Exception {
-    createPartitionedTable(true, true);
+    createPartitionedTable(true, true, new HashSet<>());
 
     List<List<ColumnStatistics>> stat;
     try (AutoCloseable c = deadline()) {
@@ -1015,35 +1017,93 @@ public void testPartitionStatisticsOps() throws 
Exception {
 
   @Test
   public void 
testDeletePartitionColumnStatisticsWhenEngineHasSpecialCharacter() throws 
Exception {
-    createPartitionedTable(true, true);
+    createPartitionedTable(true, true, new HashSet<>());
     objectStore.deletePartitionColumnStatistics(DEFAULT_CATALOG_NAME, DB1, 
TABLE1,
             List.of("test_part_col=a2"), null, "special '");
   }
 
-
-  @Test
-  public void testAggrStatsUseDB() throws Exception {
+  private void setAggrConf(boolean enableBitVector, boolean enableKll, int 
batchSize) {
     Configuration conf2 = MetastoreConf.newMetastoreConf(conf);
-    MetastoreConf.setBoolVar(conf2, ConfVars.STATS_FETCH_BITVECTOR, false);
-    MetastoreConf.setBoolVar(conf2, ConfVars.STATS_FETCH_KLL, false);
+    MetastoreConf.setBoolVar(conf2, ConfVars.STATS_FETCH_BITVECTOR, 
enableBitVector);
+    MetastoreConf.setBoolVar(conf2, ConfVars.STATS_FETCH_KLL, enableKll);
+    MetastoreConf.setLongVar(conf2, ConfVars.DIRECT_SQL_PARTITION_BATCH_SIZE, 
batchSize);
     objectStore.setConf(conf2);
+  }
 
-    createPartitionedTable(true, true);
-
-    AggrStats aggrStats;
+  private AggrStats runStatsAggregation() throws Exception {
     try (AutoCloseable c = deadline()) {
-      aggrStats = objectStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, DB1, 
TABLE1,
-          Arrays.asList("test_part_col=a0", "test_part_col=a1", 
"test_part_col=a2"),
-          Collections.singletonList("test_part_col"), ENGINE);
+      return objectStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, DB1, TABLE1,
+              Arrays.asList("test_part_col=a0", "test_part_col=a1", 
"test_part_col=a2"),
+              Collections.singletonList("test_part_col"), ENGINE);
     }
-    List<ColumnStatisticsObj> stats = aggrStats.getColStats();
-    Assert.assertEquals(1, stats.size());
+  }
+
+  private void assertAggrStats(AggrStats aggrStats, ColumnStatisticsData 
computedStats) {
+    Assert.assertEquals(1, aggrStats.getColStats().size());
     Assert.assertEquals(3, aggrStats.getPartsFound());
+    ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(long.class).numNulls(3).numDVs(2)
+            .low(3L).high(4L).build();
+    assertEqualStatistics(expectedStats, computedStats);
+  }
+
+  private void statsAggrResourceCleanup()
+          throws Exception {
+    try (AutoCloseable c = deadline()) {
+      objectStore.dropPartitionsInternal(DEFAULT_CATALOG_NAME, DB1, TABLE1,
+              Arrays.asList("test_part_col=a0", "test_part_col=a1", 
"test_part_col=a2"), true, true);
+      objectStore.dropTable(DEFAULT_CATALOG_NAME, DB1, TABLE1);
+      objectStore.dropDatabase(DEFAULT_CATALOG_NAME, DB1);
+    }
+  }
+
+  @Test
+  public void testStatsAggrWithKll() throws Exception {
+    setAggrConf(false, true, 2);
+    createPartitionedTable(true, true, new HashSet<>());
+    AggrStats aggrStats = runStatsAggregation();
+    ColumnStatisticsData computedStats = 
aggrStats.getColStats().get(0).getStatsData();
+    computedStats = new ColStatsBuilder<>(long.class)
+            
.numNulls(computedStats.getLongStats().getNumNulls()).numDVs(computedStats.getLongStats().getNumDVs())
+            
.low(computedStats.getLongStats().getLowValue()).high(computedStats.getLongStats().getHighValue()).build();
+    assertAggrStats(aggrStats, computedStats);
+    statsAggrResourceCleanup();
+  }
 
+  @Test
+  public void testStatsAggrWithBitVector() throws Exception {
+    setAggrConf(true, false, 2);
+    createPartitionedTable(true, true, new HashSet<>());
+    AggrStats aggrStats = runStatsAggregation();
     ColumnStatisticsData computedStats = 
aggrStats.getColStats().get(0).getStatsData();
+    computedStats = new ColStatsBuilder<>(long.class)
+            
.numNulls(computedStats.getLongStats().getNumNulls()).numDVs(computedStats.getLongStats().getNumDVs())
+            
.low(computedStats.getLongStats().getLowValue()).high(computedStats.getLongStats().getHighValue()).build();
+    assertAggrStats(aggrStats, computedStats);
+    statsAggrResourceCleanup();
+  }
+
+  @Test
+  public void testStatsAggrWithBackendDB() throws Exception {
+    setAggrConf(false, false, 2);
+    createPartitionedTable(true, true, new HashSet<>());
+    AggrStats aggrStats = runStatsAggregation();
+    ColumnStatisticsData computedStats = 
aggrStats.getColStats().get(0).getStatsData();
+    assertAggrStats(aggrStats, computedStats);
+    statsAggrResourceCleanup();
+  }
+
+  @Test
+  public void testMissingPartsStatsAggrWithBackendDB() throws Exception {
+    setAggrConf(false, false, 2);
+    createPartitionedTable(true, true, new 
HashSet<>(Collections.singletonList(1)));
+    AggrStats aggrStats = runStatsAggregation();
+    ColumnStatisticsData computedStats = 
aggrStats.getColStats().get(0).getStatsData();
+    Assert.assertEquals(1, aggrStats.getColStats().size());
+    Assert.assertEquals(2, aggrStats.getPartsFound());
     ColumnStatisticsData expectedStats = new 
ColStatsBuilder<>(long.class).numNulls(3).numDVs(2)
-        .low(3L).high(4L).build();
+            .low(3L).high(4L).build();
     assertEqualStatistics(expectedStats, computedStats);
+    statsAggrResourceCleanup();
   }
 
   /**
@@ -1051,7 +1111,7 @@ public void testAggrStatsUseDB() throws Exception {
    * @param withPrivileges Should we create privileges as well
    * @param withStatistics Should we create statitics as well
    */
-  private void createPartitionedTable(boolean withPrivileges, boolean 
withStatistics)
+  private void createPartitionedTable(boolean withPrivileges, boolean 
withStatistics, Set<Integer> statsMissingIndices)
       throws Exception {
     Database db1 = new DatabaseBuilder()
                        .setName(DB1)
@@ -1115,6 +1175,9 @@ private void createPartitionedTable(boolean 
withPrivileges, boolean withStatisti
       }
 
       if (withStatistics) {
+        if(statsMissingIndices.contains(i)){
+          continue;
+        }
         ColumnStatistics stats = new ColumnStatistics();
         ColumnStatisticsDesc desc = new ColumnStatisticsDesc();
         desc.setCatName(tbl1.getCatName());
@@ -1802,7 +1865,7 @@ public void testListPackage() throws Exception {
   @Test
   public void testSavePoint() throws Exception {
     List<String> partNames = Arrays.asList("test_part_col=a0", 
"test_part_col=a1", "test_part_col=a2");
-    createPartitionedTable(true, false);
+    createPartitionedTable(true, false, new HashSet<>());
     Assert.assertEquals(3, objectStore.getPartitionCount());
 
     objectStore.openTransaction();


Reply via email to