This is an automated email from the ASF dual-hosted git repository.
dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 6f6ab84a7bf HIVE-29203: get_aggr_stats_for doesn't aggregate stats
when direct sql… (#6089)
6f6ab84a7bf is described below
commit 6f6ab84a7bff723554f1edd689781a9aac800919
Author: ramitg254 <[email protected]>
AuthorDate: Fri Jan 23 12:27:41 2026 +0530
HIVE-29203: get_aggr_stats_for doesn't aggregate stats when direct sql…
(#6089)
---
.../hadoop/hive/metastore/DirectSqlAggrStats.java | 620 ++++++++++++++++++++
.../hadoop/hive/metastore/DirectSqlUpdatePart.java | 2 +-
.../hive/metastore/IExtrapolatePartStatus.java | 34 ++
.../hadoop/hive/metastore/MetaStoreDirectSql.java | 630 +--------------------
.../hive/metastore/MetastoreDirectSqlUtils.java | 71 +++
.../apache/hadoop/hive/metastore/ObjectStore.java | 9 +-
.../hadoop/hive/metastore/TestObjectStore.java | 111 +++-
7 files changed, 834 insertions(+), 643 deletions(-)
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlAggrStats.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlAggrStats.java
new file mode 100644
index 00000000000..9e1d4273060
--- /dev/null
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlAggrStats.java
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.Query;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collections;
+
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.COLNAME;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.COLTYPE;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.LONG_LOW_VALUE;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.LONG_HIGH_VALUE;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.DOUBLE_LOW_VALUE;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.DOUBLE_HIGH_VALUE;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.BIG_DECIMAL_LOW_VALUE;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.BIG_DECIMAL_HIGH_VALUE;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.NUM_NULLS;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.NUM_DISTINCTS;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.AVG_COL_LEN;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.MAX_COL_LEN;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.NUM_TRUES;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.NUM_FALSES;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.SUM_NDV_LONG;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.SUM_NDV_DOUBLE;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.SUM_NDV_DECIMAL;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.COUNT_ROWS;
+import static
org.apache.hadoop.hive.metastore.IExtrapolatePartStatus.DBStatsAggrIndices.SUM_NUM_DISTINCTS;
+import static
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.getFullyQualifiedName;
+import static
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.makeParams;
+import static
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.executeWithArray;
+import static
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.prepareParams;
+
+class DirectSqlAggrStats {
+ private static final int NO_BATCHING = -1;
+ private static final int DETECT_BATCHING = 0;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DirectSqlAggrStats.class);
+ private final PersistenceManager pm;
+ private final int batchSize;
+
+ @java.lang.annotation.Target(java.lang.annotation.ElementType.FIELD)
+ @java.lang.annotation.Retention(java.lang.annotation.RetentionPolicy.RUNTIME)
+ private @interface TableName {
+ }
+
+ @TableName
+ private String DBS, TBLS, PARTITIONS, PART_COL_STATS, TAB_COL_STATS;
+
+ public DirectSqlAggrStats(PersistenceManager pm, Configuration conf, String
schema) {
+ this.pm = pm;
+ DatabaseProduct dbType = PersistenceManagerProvider.getDatabaseProduct();
+ int configBatchSize =
+ MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.DIRECT_SQL_PARTITION_BATCH_SIZE);
+ if (configBatchSize == DETECT_BATCHING) {
+ configBatchSize = dbType.needsInBatching() ? 1000 : NO_BATCHING;
+ }
+ this.batchSize = configBatchSize;
+ ImmutableMap.Builder<String, String> fieldNameToTableNameBuilder =
+ new ImmutableMap.Builder<>();
+
+ for (java.lang.reflect.Field f : this.getClass().getDeclaredFields()) {
+ if (f.getAnnotation(DirectSqlAggrStats.TableName.class) == null) {
+ continue;
+ }
+ try {
+ String value = getFullyQualifiedName(schema, f.getName());
+ f.set(this, value);
+ fieldNameToTableNameBuilder.put(f.getName(), value);
+ } catch (IllegalArgumentException | IllegalAccessException e) {
+ throw new RuntimeException("Internal error, cannot set " +
f.getName());
+ }
+ }
+ }
+
+ /**
+ * Retrieve the column statistics for the specified columns of the table.
NULL
+ * is returned if the columns are not provided.
+ *
+ * @param catName the catalog name of the table
+ * @param dbName the database name of the table
+ * @param tableName the table name
+ * @param colNames the list of the column names
+ * @param engine engine making the request
+ * @return the column statistics for the specified columns
+ * @throws MetaException
+ */
+ public ColumnStatistics getTableStats(final String catName,
+ final String dbName,
+ final String tableName, List<String>
colNames,
+ String engine, boolean enableBitVector,
+ boolean enableKll) throws
MetaException {
+ if (colNames == null || colNames.isEmpty()) {
+ return null;
+ }
+ final boolean doTrace = LOG.isDebugEnabled();
+ final String queryText0 = "select " + getStatsList(enableBitVector,
enableKll) + " from " + TAB_COL_STATS
+ + " inner join " + TBLS + " on " + TAB_COL_STATS + ".\"TBL_ID\" = " +
TBLS + ".\"TBL_ID\" "
+ + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS +
".\"DB_ID\" "
+ + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ?
and " + TBLS + ".\"TBL_NAME\" = ?"
+ + " and \"ENGINE\" = ? and \"COLUMN_NAME\" in (";
+ Batchable<String, Object[]> b = new Batchable<String, Object[]>() {
+ @Override
+ public List<Object[]> run(List<String> input) throws MetaException {
+ Object[] params = new Object[input.size() + 4];
+ params[0] = catName;
+ params[1] = dbName;
+ params[2] = tableName;
+ params[3] = engine;
+ for (int i = 0; i < input.size(); ++i) {
+ params[i + 4] = input.get(i);
+ }
+ String queryText = queryText0 + makeParams(input.size()) + ")";
+ long start = doTrace ? System.nanoTime() : 0;
+ Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+ try {
+ Object qResult = executeWithArray(query, params, queryText);
+ MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0 + "...)",
start,
+ (doTrace ? System.nanoTime() : 0));
+ if (qResult == null) {
+ return null;
+ }
+ return MetastoreDirectSqlUtils.ensureList(qResult);
+ } finally {
+ addQueryAfterUse(query);
+ }
+ }
+ };
+ List<Object[]> list;
+ try {
+ list = Batchable.runBatched(batchSize, colNames, b);
+ if (list != null) {
+ list = new ArrayList<>(list);
+ }
+ } finally {
+ b.closeAllQueries();
+ }
+
+ if (list == null || list.isEmpty()) {
+ return null;
+ }
+ ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName,
tableName);
+ csd.setCatName(catName);
+ return makeColumnStats(list, csd, 0, engine);
+ }
+
+ public List<ColumnStatisticsObj> columnStatisticsObjForPartitions(
+ final String catName, final String dbName,
+ final String tableName, final List<String> partNames,
+ List<String> colNames, String engine, long partsFound,
+ final boolean useDensityFunctionForNDVEstimation,
+ final double ndvTuner, final boolean enableBitVector,
+ boolean enableKll) throws MetaException {
+ final boolean areAllPartsFound = (partsFound == partNames.size());
+ return Batchable.runBatched(batchSize, colNames, new Batchable<String,
ColumnStatisticsObj>() {
+ @Override
+ public List<ColumnStatisticsObj> run(final List<String> inputColNames)
throws MetaException {
+ return columnStatisticsObjForPartitionsBatch(catName, dbName,
tableName,
+ partNames, inputColNames, engine, areAllPartsFound,
+ useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector,
enableKll);
+ }
+ });
+ }
+
+ /**
+ * Should be called with the list short enough to not trip up Oracle/etc.
+ */
+ private List<ColumnStatisticsObj> columnStatisticsObjForPartitionsBatch(
+ String catName,
+ String dbName, String tableName,
+ List<String> partNames, List<String> colNames, String engine,
+ boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation,
+ double ndvTuner, boolean enableBitVector,
+ boolean enableKll) throws MetaException {
+ if (enableBitVector || enableKll) {
+ return aggrStatsUseJava(catName, dbName, tableName, partNames,
+ colNames, engine, areAllPartsFound,
useDensityFunctionForNDVEstimation,
+ ndvTuner, enableBitVector, enableKll);
+ } else {
+ return aggrStatsUseDB(catName, dbName, tableName, partNames, colNames,
engine,
+ useDensityFunctionForNDVEstimation, ndvTuner);
+ }
+ }
+
+ private List<ColumnStatisticsObj> aggrStatsUseJava(
+ String catName, String dbName, String tableName,
+ List<String> partNames, List<String> colNames, String engine, boolean
areAllPartsFound,
+ boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean
enableBitVector,
+ boolean enableKll) throws MetaException {
+ // 1. get all the stats for colNames in partNames;
+ List<ColumnStatistics> partStats =
+ getPartitionStats(catName, dbName, tableName, partNames, colNames,
engine, enableBitVector, enableKll);
+ // 2. use util function to aggr stats
+ return MetaStoreServerUtils.aggrPartitionStats(partStats, catName, dbName,
tableName, partNames, colNames,
+ areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner);
+ }
+
+ private List<ColumnStatisticsObj> aggrStatsUseDB(
+ String catName, String dbName, String tableName,
+ List<String> partNames, List<String> colNames, String engine,
+ boolean useDensityFunctionForNDVEstimation, double ndvTuner)
+ throws MetaException {
+ // TODO: all the extrapolation logic should be moved out of this class,
+ // only mechanical data retrieval should remain here.
+ String queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "
+ + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"),
min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
+ + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)),
max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
+ + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
+ + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"),
sum(\"NUM_FALSES\"), "
+ // The following data is used to compute a partitioned table's NDV
based
+ // on partitions' NDV when useDensityFunctionForNDVEstimation = true.
Global NDVs cannot be
+ // accurately derived from partition NDVs, because the domain of
column value two partitions
+ // can overlap. If there is no overlap then global NDV is just the sum
+ // of partition NDVs (UpperBound). But if there is some overlay then
+ // global NDV can be anywhere between sum of partition NDVs (no
overlap)
+ // and same as one of the partition NDV (domain of column value in all
other
+ // partitions is subset of the domain value in one of the partition)
+ // (LowerBound).But under uniform distribution, we can roughly
estimate the global
+ // NDV by leveraging the min/max values.
+ // And, we also guarantee that the estimation makes sense by comparing
it to the
+ // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
+ // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
+ + "sum((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\"
as decimal)),"
+ +
"sum((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+ + "sum((cast(\"BIG_DECIMAL_HIGH_VALUE\" as
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
+ + "count(1),"
+ + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + ""
+ + " inner join " + PARTITIONS + " on " + PART_COL_STATS +
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+ + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " +
TBLS + ".\"TBL_ID\""
+ + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS +
".\"DB_ID\""
+ + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ?
and " + TBLS + ".\"TBL_NAME\" = ? "
+ + " and \"COLUMN_NAME\" in (%1$s)" + " and " + PARTITIONS +
".\"PART_NAME\" in (%2$s)"
+ + " and \"ENGINE\" = ? " + " group by \"COLUMN_NAME\",
\"COLUMN_TYPE\"";
+
+ boolean doTrace = LOG.isDebugEnabled();
+
+ List<ColumnStatisticsObj> colStats = new ArrayList<>(colNames.size());
+ List<Object[]> partialStatsRows = new ArrayList<>(colNames.size());
+ columnWiseStatsMerger(queryText, catName, dbName, tableName,
+ colNames, partNames, colStats, partialStatsRows,
+ engine, useDensityFunctionForNDVEstimation, ndvTuner, doTrace);
+
+ // Extrapolation is needed for partialStatsRows.
+ if (partialStatsRows.size() != 0) {
+ Map<String, Integer> indexMap = new HashMap<String, Integer>();
+ for (int index = 0; index < partNames.size(); index++) {
+ indexMap.put(partNames.get(index), index);
+ }
+
+ for (Object[] row : partialStatsRows) {
+ String colName = row[COLNAME.idx()].toString();
+ String colType = row[COLTYPE.idx()].toString();
+ BigDecimal countVal = new BigDecimal(row[COUNT_ROWS.idx()].toString());
+
+ // use linear extrapolation. more complicated one can be added in the
+ // future.
+ IExtrapolatePartStatus extrapolateMethod = new
LinearExtrapolatePartStatus();
+ // fill in colstatus
+ Integer[] index;
+ boolean decimal = false;
+ if (colType.toLowerCase().startsWith("decimal")) {
+ index = IExtrapolatePartStatus.indexMaps.get("decimal");
+ decimal = true;
+ } else {
+ index = IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase());
+ }
+ // if the colType is not the known type, long, double, etc, then get
+ // all index.
+ if (index == null) {
+ index = IExtrapolatePartStatus.indexMaps.get("default");
+ }
+
+ for (int colStatIndex : index) {
+ String colStatName =
IExtrapolatePartStatus.colStatNames[colStatIndex];
+ // if the aggregation type is sum, we do a scale-up
+ if (
+ IExtrapolatePartStatus.aggrTypes[colStatIndex] ==
IExtrapolatePartStatus.AggrType.Sum
+ ) {
+ // +3 only for the case of SUM_NUM_DISTINCTS which is after count
rows index
+ int rowIndex = (colStatIndex == 15) ? colStatIndex + 3 :
colStatIndex + 2;
+ if (row[rowIndex] != null) {
+ Long val = MetastoreDirectSqlUtils.extractSqlLong(row[rowIndex]);
+ row[rowIndex] = val / countVal.longValue() * (partNames.size());
+ }
+ } else if (
+ IExtrapolatePartStatus.aggrTypes[colStatIndex] ==
IExtrapolatePartStatus.AggrType.Min ||
+ IExtrapolatePartStatus.aggrTypes[colStatIndex] ==
IExtrapolatePartStatus.AggrType.Max
+ ) {
+ // if the aggregation type is min/max, we extrapolate from the
+ // left/right borders
+ String orderByExpr =
+ decimal ? "cast(\"" + colStatName + "\" as decimal)" : "\"" +
colStatName + "\"";
+
+ queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " +
PART_COL_STATS
+ + " inner join " + PARTITIONS + " on " + PART_COL_STATS +
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+ + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" =
" + TBLS + ".\"TBL_ID\""
+ + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS
+ ".\"DB_ID\""
+ + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS +
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
+ + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (%1$s)"
+ + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)"
+ + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
+ + " order by " + orderByExpr;
+
+ Batchable<String, Object[]> columnWisePartitionBatches =
+ columnWisePartitionBatcher(queryText, catName, dbName,
tableName, partNames, engine, doTrace);
+ try {
+ List<Object[]> list =
+ Batchable.runBatched(batchSize,
Collections.singletonList(colName), columnWisePartitionBatches);
+ Object[] min = list.getFirst();
+ Object[] max = list.getLast();
+ if (batchSize > 0) {
+ for (int i = Math.min(batchSize - 1, list.size() - 1); i <
list.size(); i += batchSize) {
+ Object[] posMax = list.get(i);
+ if (new BigDecimal(max[0].toString()).compareTo(new
BigDecimal(posMax[0].toString())) < 0) {
+ max = posMax;
+ }
+ int j = i + 1;
+ if (j < list.size()) {
+ Object[] posMin = list.get(j);
+ if (new BigDecimal(min[0].toString()).compareTo(new
BigDecimal(posMin[0].toString())) > 0) {
+ min = posMin;
+ }
+ }
+ }
+ }
+ if (min[0] == null || max[0] == null) {
+ row[2 + colStatIndex] = null;
+ } else {
+ row[2 + colStatIndex] = extrapolateMethod.extrapolate(min,
max, colStatIndex, indexMap);
+ }
+ } finally {
+ columnWisePartitionBatches.closeAllQueries();
+ }
+ }
+ }
+ colStats.add(prepareCSObjWithAdjustedNDV
+ (row, useDensityFunctionForNDVEstimation, ndvTuner));
+ Deadline.checkTimeout();
+ }
+ }
+ return colStats;
+ }
+
+ private void columnWiseStatsMerger(
+ final String queryText, final String catName, final String dbName,
+ final String tableName, final List<String> colNames, final List<String>
partNames,
+ final List<ColumnStatisticsObj> colStats, final List<Object[]>
partialStatsRows, final String engine,
+ final boolean useDensityFunctionForNDVEstimation, final double ndvTuner,
+ final boolean doTrace
+ ) throws MetaException {
+ Batchable<String, Object[]> columnWisePartitionBatches =
+ columnWisePartitionBatcher(queryText, catName, dbName, tableName,
partNames, engine, doTrace);
+ try {
+ List<Object[]> unmergedColStatslist = Batchable.runBatched(batchSize,
colNames, columnWisePartitionBatches);
+ Map<String, Object[]> mergedColStatsMap = new HashMap<>();
+ for (Object[] unmergedRow : unmergedColStatslist) {
+ String colName = (String) unmergedRow[0];
+ Object[] mergedRow = mergedColStatsMap.getOrDefault(colName, new
Object[21]);
+ mergeBackendDBStats(mergedRow, unmergedRow);
+ mergedColStatsMap.put(colName, mergedRow);
+ }
+
+ for (Map.Entry<String, Object[]> entry : mergedColStatsMap.entrySet()) {
+ BigDecimal partCount = new
BigDecimal(entry.getValue()[COUNT_ROWS.idx()].toString());
+ if (partCount.equals(new BigDecimal(partNames.size())) ||
partCount.longValue() < 2) {
+ colStats.add(
+ prepareCSObjWithAdjustedNDV(entry.getValue(),
useDensityFunctionForNDVEstimation, ndvTuner));
+ } else {
+ partialStatsRows.add(entry.getValue());
+ }
+ Deadline.checkTimeout();
+ }
+ } finally {
+ columnWisePartitionBatches.closeAllQueries();
+ }
+ }
+
+ private void mergeBackendDBStats(Object[] row1, Object[] row2) {
+ if (row1[COLNAME.idx()] == null) {
+ row1[COLNAME.idx()] = row2[COLNAME.idx()];
+ row1[COLTYPE.idx()] = row2[COLTYPE.idx()];
+ }
+ row1[LONG_LOW_VALUE.idx()] =
MetastoreDirectSqlUtils.min(row1[LONG_LOW_VALUE.idx()],
row2[LONG_LOW_VALUE.idx()]);
+ row1[LONG_HIGH_VALUE.idx()] =
MetastoreDirectSqlUtils.max(row1[LONG_HIGH_VALUE.idx()],
row2[LONG_HIGH_VALUE.idx()]);
+ row1[DOUBLE_LOW_VALUE.idx()] =
MetastoreDirectSqlUtils.min(row1[DOUBLE_LOW_VALUE.idx()],
row2[DOUBLE_LOW_VALUE.idx()]);
+ row1[DOUBLE_HIGH_VALUE.idx()] =
MetastoreDirectSqlUtils.max(row1[DOUBLE_HIGH_VALUE.idx()],
row2[DOUBLE_HIGH_VALUE.idx()]);
+ row1[BIG_DECIMAL_LOW_VALUE.idx()] =
MetastoreDirectSqlUtils.min(row1[BIG_DECIMAL_LOW_VALUE.idx()],
row2[BIG_DECIMAL_LOW_VALUE.idx()]);
+ row1[BIG_DECIMAL_HIGH_VALUE.idx()] =
MetastoreDirectSqlUtils.max(row1[BIG_DECIMAL_HIGH_VALUE.idx()],
row2[BIG_DECIMAL_HIGH_VALUE.idx()]);
+ row1[NUM_NULLS.idx()] = MetastoreDirectSqlUtils.sum(row1[NUM_NULLS.idx()],
row2[NUM_NULLS.idx()]);
+ row1[NUM_DISTINCTS.idx()] =
MetastoreDirectSqlUtils.max(row1[NUM_DISTINCTS.idx()],
row2[NUM_DISTINCTS.idx()]);
+ row1[AVG_COL_LEN.idx()] =
MetastoreDirectSqlUtils.max(row1[AVG_COL_LEN.idx()], row2[AVG_COL_LEN.idx()]);
+ row1[MAX_COL_LEN.idx()] =
MetastoreDirectSqlUtils.max(row1[MAX_COL_LEN.idx()], row2[MAX_COL_LEN.idx()]);
+ row1[NUM_TRUES.idx()] = MetastoreDirectSqlUtils.sum(row1[NUM_TRUES.idx()],
row2[NUM_TRUES.idx()]);
+ row1[NUM_FALSES.idx()] =
MetastoreDirectSqlUtils.sum(row1[NUM_FALSES.idx()], row2[NUM_FALSES.idx()]);
+ row1[SUM_NDV_LONG.idx()] =
MetastoreDirectSqlUtils.sum(row1[SUM_NDV_LONG.idx()], row2[SUM_NDV_LONG.idx()]);
+ row1[SUM_NDV_DOUBLE.idx()] =
MetastoreDirectSqlUtils.sum(row1[SUM_NDV_DOUBLE.idx()],
row2[SUM_NDV_DOUBLE.idx()]);
+ row1[SUM_NDV_DECIMAL.idx()] =
MetastoreDirectSqlUtils.sum(row1[SUM_NDV_DECIMAL.idx()],
row2[SUM_NDV_DECIMAL.idx()]);
+ row1[COUNT_ROWS.idx()] =
MetastoreDirectSqlUtils.sum(row1[COUNT_ROWS.idx()], row2[COUNT_ROWS.idx()]);
+ row1[SUM_NUM_DISTINCTS.idx()] =
MetastoreDirectSqlUtils.sum(row1[SUM_NUM_DISTINCTS.idx()],
row2[SUM_NUM_DISTINCTS.idx()]);
+ }
+
+ private ColumnStatisticsObj prepareCSObjWithAdjustedNDV(
+ Object[] row,
+ boolean useDensityFunctionForNDVEstimation, double ndvTuner)
+ throws MetaException {
+ if (row == null) {
+ return null;
+ }
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ ColumnStatisticsObj cso = new ColumnStatisticsObj((String)
row[COLNAME.idx()], (String) row[COLTYPE.idx()], data);
+ Object avgLong = MetastoreDirectSqlUtils.divide(row[SUM_NDV_LONG.idx()],
row[COUNT_ROWS.idx()]);
+ Object avgDouble =
MetastoreDirectSqlUtils.divide(row[SUM_NDV_DOUBLE.idx()],
row[COUNT_ROWS.idx()]);
+ Object avgDecimal =
MetastoreDirectSqlUtils.divide(row[SUM_NDV_DECIMAL.idx()],
row[COUNT_ROWS.idx()]);
+ StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data,
row[LONG_LOW_VALUE.idx()],
+ row[LONG_HIGH_VALUE.idx()], row[DOUBLE_LOW_VALUE.idx()],
row[DOUBLE_HIGH_VALUE.idx()], row[BIG_DECIMAL_LOW_VALUE.idx()],
row[BIG_DECIMAL_HIGH_VALUE.idx()],
+ row[NUM_NULLS.idx()], row[NUM_DISTINCTS.idx()],
row[AVG_COL_LEN.idx()], row[MAX_COL_LEN.idx()], row[NUM_TRUES.idx()],
row[NUM_FALSES.idx()],
+ avgLong, avgDouble, avgDecimal, row[SUM_NUM_DISTINCTS.idx()],
+ useDensityFunctionForNDVEstimation, ndvTuner);
+ return cso;
+ }
+
+ private ColumnStatisticsObj prepareCSObj(Object[] row, int i) throws
MetaException {
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ ColumnStatisticsObj cso = new ColumnStatisticsObj((String)row[i++],
(String)row[i++], data);
+ Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh =
row[i++],
+ declow = row[i++], dechigh = row[i++], nulls = row[i++], dist =
row[i++], bitVector = row[i++],
+ histogram = row[i++], avglen = row[i++], maxlen = row[i++], trues =
row[i++], falses = row[i];
+ StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data,
+ llow, lhigh, dlow, dhigh, declow, dechigh, nulls, dist, bitVector,
histogram, avglen, maxlen, trues, falses);
+ return cso;
+ }
+
+ private Batchable<String, Object[]> columnWisePartitionBatcher(
+ final String queryText0, final String catName, final String dbName,
+ final String tableName, final List<String> partNames, final String
engine,
+ final boolean doTrace) {
+ return new Batchable<String, Object[]>() {
+ @Override
+ public List<Object[]> run(final List<String> inputColNames)
+ throws MetaException {
+ Batchable<String, Object[]> partitionBatchesFetcher = new
Batchable<String, Object[]>() {
+ @Override
+ public List<Object[]> run(List<String> inputPartNames)
+ throws MetaException {
+ String queryText =
+ String.format(queryText0, makeParams(inputColNames.size()),
makeParams(inputPartNames.size()));
+ long start = doTrace ? System.nanoTime() : 0;
+ Query<?> query = pm.newQuery("javax.jdo.query.SQL", queryText);
+ try {
+ Object qResult = executeWithArray(query,
+ prepareParams(catName, dbName, tableName, inputPartNames,
inputColNames, engine), queryText);
+ long end = doTrace ? System.nanoTime() : 0;
+ MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0, start,
end);
+ if (qResult == null) {
+ return Collections.emptyList();
+ }
+ return MetastoreDirectSqlUtils.ensureList(qResult);
+ } finally {
+ addQueryAfterUse(query);
+ }
+ }
+ };
+ try {
+ return Batchable.runBatched(batchSize, partNames,
partitionBatchesFetcher);
+ } finally {
+ addQueryAfterUse(partitionBatchesFetcher);
+ }
+ }
+ };
+ }
+
+ public List<MetaStoreServerUtils.ColStatsObjWithSourceInfo>
getColStatsForAllTablePartitions(
+ String catName, String dbName,
+ boolean enableBitVector, boolean enableKll) throws MetaException {
+ String queryText = "select \"TBLS\".\"TBL_NAME\",
\"PARTITIONS\".\"PART_NAME\", "
+ + getStatsList(enableBitVector, enableKll)
+ + " from " + PART_COL_STATS
+ + " inner join " + PARTITIONS + " on " + PART_COL_STATS +
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+ + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " +
TBLS + ".\"TBL_ID\""
+ + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS +
".\"DB_ID\""
+ + " where " + DBS + ".\"NAME\" = ? and " + DBS + ".\"CTLG_NAME\" = ?";
+ long start = 0;
+ long end = 0;
+ boolean doTrace = LOG.isDebugEnabled();
+ Object qResult = null;
+ start = doTrace ? System.nanoTime() : 0;
+ List<MetaStoreServerUtils.ColStatsObjWithSourceInfo> colStatsForDB = new
ArrayList<>();
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
+ qResult = executeWithArray(query.getInnerQuery(), new Object[] { dbName,
catName }, queryText);
+ if (qResult == null) {
+ return colStatsForDB;
+ }
+ end = doTrace ? System.nanoTime() : 0;
+ MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
+ List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
+ for (Object[] row : list) {
+ String tblName = (String) row[0];
+ String partName = (String) row[1];
+ ColumnStatisticsObj colStatObj = prepareCSObj(row, 2);
+ colStatsForDB.add(
+ new MetaStoreServerUtils.ColStatsObjWithSourceInfo(colStatObj,
catName, dbName, tblName, partName));
+ Deadline.checkTimeout();
+ }
+ }
+ return colStatsForDB;
+ }
+
+ public List<ColumnStatistics> getPartitionStats(
+ final String catName, final String dbName, final String tableName, final
List<String> partNames,
+ List<String> colNames, String engine, boolean enableBitVector, boolean
enableKll) throws MetaException {
+ if (colNames.isEmpty() || partNames.isEmpty()) {
+ return Collections.emptyList();
+ }
+ final boolean doTrace = LOG.isDebugEnabled();
+ final String queryText0 = "select \"PARTITIONS\".\"PART_NAME\", " +
getStatsList(enableBitVector, enableKll)
+ + " from " + PART_COL_STATS
+ + " inner join " + PARTITIONS + " on " + PART_COL_STATS +
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+ + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " +
TBLS + ".\"TBL_ID\""
+ + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS +
".\"DB_ID\""
+ + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ?
and " + TBLS + ".\"TBL_NAME\" = ? "
+ + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (%1$s)"
+ + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)"
+ + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
+ + " order by " + PARTITIONS + ".\"PART_NAME\"";
+ Batchable<String, Object[]> b =
+ columnWisePartitionBatcher(queryText0, catName, dbName, tableName,
partNames, engine, doTrace);
+ List<ColumnStatistics> result = new ArrayList<>(partNames.size());
+ String lastPartName = null;
+ int from = 0;
+ try {
+ List<Object[]> list = Batchable.runBatched(batchSize, colNames, b);
+ for (int i = 0; i <= list.size(); ++i) {
+ boolean isLast = i == list.size();
+ String partName = isLast ? null : (String) list.get(i)[0];
+ if (!isLast && partName.equals(lastPartName)) {
+ continue;
+ } else if (from != i) {
+ ColumnStatisticsDesc csd =
+ new ColumnStatisticsDesc(false, dbName, tableName);
+ csd.setCatName(catName);
+ csd.setPartName(lastPartName);
+ result.add(makeColumnStats(list.subList(from, i), csd, 1, engine));
+ }
+ lastPartName = partName;
+ from = i;
+ Deadline.checkTimeout();
+ }
+ } finally {
+ b.closeAllQueries();
+ }
+ return result;
+ }
+
+ private ColumnStatistics makeColumnStats(
+ List<Object[]> list, ColumnStatisticsDesc csd,
+ int offset, String engine) throws MetaException {
+ ColumnStatistics result = new ColumnStatistics();
+ result.setStatsDesc(csd);
+ List<ColumnStatisticsObj> csos = new ArrayList<>(list.size());
+ for (Object[] row : list) {
+ // LastAnalyzed is stored per column but thrift has it per several;
+ // get the lowest for now as nobody actually uses this field.
+ Object laObj = row[offset + 16]; // 16 is the offset of "last analyzed"
field
+ if (laObj != null && (!csd.isSetLastAnalyzed() ||
+ csd.getLastAnalyzed() > MetastoreDirectSqlUtils
+ .extractSqlLong(laObj))) {
+ csd.setLastAnalyzed(MetastoreDirectSqlUtils.extractSqlLong(laObj));
+ }
+ csos.add(prepareCSObj(row, offset));
+ Deadline.checkTimeout();
+ }
+ result.setStatsObj(csos);
+ result.setEngine(engine);
+ return result;
+ }
+
+ /**
+ * The common query part for table and partition stats
+ */
+ private String getStatsList(boolean enableBitVector, boolean enableKll) {
+ return "\"COLUMN_NAME\", \"COLUMN_TYPE\", \"LONG_LOW_VALUE\",
\"LONG_HIGH_VALUE\", "
+ + "\"DOUBLE_LOW_VALUE\", \"DOUBLE_HIGH_VALUE\",
\"BIG_DECIMAL_LOW_VALUE\", "
+ + "\"BIG_DECIMAL_HIGH_VALUE\", \"NUM_NULLS\", \"NUM_DISTINCTS\", "
+ + (enableBitVector ? "\"BIT_VECTOR\", " : "\'\', ")
+ + (enableKll ? "\"HISTOGRAM\", " : "\'\', ")
+ + "\"AVG_COL_LEN\", \"MAX_COL_LEN\", \"NUM_TRUES\", \"NUM_FALSES\",
\"LAST_ANALYZED\" ";
+ }
+}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
index 15924396932..4473067ad09 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
@@ -89,7 +89,7 @@ class DirectSqlUpdatePart {
private final int maxBatchSize;
private final SQLGenerator sqlGenerator;
- public DirectSqlUpdatePart(PersistenceManager pm, Configuration conf,
+ DirectSqlUpdatePart(PersistenceManager pm, Configuration conf,
DatabaseProduct dbType, int batchSize) {
this.pm = pm;
this.conf = conf;
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IExtrapolatePartStatus.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IExtrapolatePartStatus.java
index aa50e91076c..4627eebd3be 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IExtrapolatePartStatus.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IExtrapolatePartStatus.java
@@ -22,6 +22,40 @@
import java.util.Map;
public interface IExtrapolatePartStatus {
+
+ enum DBStatsAggrIndices {
+ COLNAME(0),
+ COLTYPE(1),
+ LONG_LOW_VALUE(2),
+ LONG_HIGH_VALUE(3),
+ DOUBLE_LOW_VALUE(4),
+ DOUBLE_HIGH_VALUE(5),
+ BIG_DECIMAL_LOW_VALUE(6),
+ BIG_DECIMAL_HIGH_VALUE(7),
+ NUM_NULLS(8),
+ NUM_DISTINCTS(9),
+ AVG_COL_LEN(10),
+ MAX_COL_LEN(11),
+ NUM_TRUES(12),
+ NUM_FALSES(13),
+ SUM_NDV_LONG(14),
+ SUM_NDV_DOUBLE(15),
+ SUM_NDV_DECIMAL(16),
+ COUNT_ROWS(17),
+ SUM_NUM_DISTINCTS(18);
+
+ private final int idx;
+
+ DBStatsAggrIndices(int idx) {
+ this.idx = idx;
+ }
+
+ public int idx() {
+ return idx;
+ }
+
+ }
+
/**
* The sequence of colStatNames.
*/
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index 2d22278326a..49d187505dc 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -20,7 +20,6 @@
import static org.apache.commons.lang3.StringUtils.join;
import static org.apache.commons.lang3.StringUtils.normalizeSpace;
-import static org.apache.commons.lang3.StringUtils.repeat;
import static org.apache.hadoop.hive.metastore.ColumnType.BIGINT_TYPE_NAME;
import static org.apache.hadoop.hive.metastore.ColumnType.CHAR_TYPE_NAME;
import static org.apache.hadoop.hive.metastore.ColumnType.DATE_TYPE_NAME;
@@ -30,6 +29,9 @@
import static org.apache.hadoop.hive.metastore.ColumnType.TIMESTAMP_TYPE_NAME;
import static org.apache.hadoop.hive.metastore.ColumnType.TINYINT_TYPE_NAME;
import static org.apache.hadoop.hive.metastore.ColumnType.VARCHAR_TYPE_NAME;
+import static
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.getFullyQualifiedName;
+import static
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.makeParams;
+import static
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.prepareParams;
import java.sql.Connection;
import java.sql.SQLException;
@@ -63,8 +65,6 @@
import org.apache.hadoop.hive.metastore.AggregateStatsCache.AggrColStats;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.DatabaseType;
@@ -115,7 +115,6 @@
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
-import
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hive.common.util.BloomFilter;
import org.datanucleus.store.rdbms.query.ForwardQueryResult;
@@ -167,6 +166,7 @@ class MetaStoreDirectSql {
private AggregateStatsCache aggrStatsCache;
private DirectSqlUpdatePart directSqlUpdatePart;
private DirectSqlInsertPart directSqlInsertPart;
+ private DirectSqlAggrStats directSqlAggrStats;
/**
* This method returns a comma separated string consisting of String values
of a given list.
@@ -206,6 +206,7 @@ public MetaStoreDirectSql(PersistenceManager pm,
Configuration conf, String sche
this.batchSize = batchSize;
this.isTxnStatsEnabled = MetastoreConf.getBoolVar(conf,
ConfVars.HIVE_TXN_STATS_ENABLED);
this.directSqlUpdatePart = new DirectSqlUpdatePart(pm, conf, dbType,
batchSize);
+ this.directSqlAggrStats = new DirectSqlAggrStats(pm, conf, schema);
ImmutableMap.Builder<String, String> fieldNameToTableNameBuilder =
new ImmutableMap.Builder<>();
@@ -267,11 +268,6 @@ public MetaStoreDirectSql(PersistenceManager pm,
Configuration conf, String sche
.build();
}
- private static String getFullyQualifiedName(String schema, String tblName) {
- return ((schema == null || schema.isEmpty()) ? "" : "\"" + schema +
"\".\"")
- + "\"" + tblName + "\"";
- }
-
private boolean ensureDbInit() {
Transaction tx = pm.currentTransaction();
boolean doCommit = false;
@@ -1612,73 +1608,6 @@ public void visit(LeafNode node) throws MetaException {
}
}
- /**
- * Retrieve the column statistics for the specified columns of the table.
NULL
- * is returned if the columns are not provided.
- * @param catName the catalog name of the table
- * @param dbName the database name of the table
- * @param tableName the table name
- * @param colNames the list of the column names
- * @param engine engine making the request
- * @return the column statistics for the specified columns
- * @throws MetaException
- */
- public ColumnStatistics getTableStats(final String catName, final String
dbName,
- final String tableName, List<String> colNames, String engine,
- boolean enableBitVector, boolean enableKll) throws MetaException {
- if (colNames == null || colNames.isEmpty()) {
- return null;
- }
- final boolean doTrace = LOG.isDebugEnabled();
- final String queryText0 = "select " + getStatsList(enableBitVector,
enableKll) + " from " + TAB_COL_STATS
- + " inner join " + TBLS + " on " + TAB_COL_STATS + ".\"TBL_ID\" = "
+ TBLS + ".\"TBL_ID\" "
- + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS +
".\"DB_ID\" "
- + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ?
and " + TBLS + ".\"TBL_NAME\" = ?"
- + " and \"ENGINE\" = ? and \"COLUMN_NAME\" in (";
- Batchable<String, Object[]> b = new Batchable<String, Object[]>() {
- @Override
- public List<Object[]> run(List<String> input) throws MetaException {
- String queryText = queryText0 + makeParams(input.size()) + ")";
- Object[] params = new Object[input.size() + 4];
- params[0] = catName;
- params[1] = dbName;
- params[2] = tableName;
- params[3] = engine;
- for (int i = 0; i < input.size(); ++i) {
- params[i + 4] = input.get(i);
- }
- long start = doTrace ? System.nanoTime() : 0;
- Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
- try {
- Object qResult = executeWithArray(query, params, queryText);
- MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0 + "...)",
start, (doTrace ? System.nanoTime() : 0));
- if (qResult == null) {
- return null;
- }
- return MetastoreDirectSqlUtils.ensureList(qResult);
- } finally {
- addQueryAfterUse(query);
- }
- }
- };
- List<Object[]> list;
- try {
- list = Batchable.runBatched(batchSize, colNames, b);
- if (list != null) {
- list = new ArrayList<>(list);
- }
- } finally {
- b.closeAllQueries();
- }
-
- if (list == null || list.isEmpty()) {
- return null;
- }
- ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName,
tableName);
- csd.setCatName(catName);
- return makeColumnStats(list, csd, 0, engine);
- }
-
public List<HiveObjectPrivilege> getTableAllColumnGrants(String catName,
String dbName,
String tableName,
String authorizer) throws MetaException {
// These constants should match the SELECT clause of the query.
@@ -1796,7 +1725,7 @@ public AggrStats aggrColStatsForPartitions(String
catName, String dbName, String
colNamesForDB.add(colName);
// Read aggregated stats for one column
colStatsAggrFromDB =
- columnStatisticsObjForPartitions(catName, dbName, tableName,
partNames, colNamesForDB, engine,
+ directSqlAggrStats.columnStatisticsObjForPartitions(catName,
dbName, tableName, partNames, colNamesForDB, engine,
partsFound, useDensityFunctionForNDVEstimation, ndvTuner,
enableBitVector, enableKll);
if (!colStatsAggrFromDB.isEmpty()) {
ColumnStatisticsObj colStatsAggr = colStatsAggrFromDB.get(0);
@@ -1809,7 +1738,7 @@ public AggrStats aggrColStatsForPartitions(String
catName, String dbName, String
} else {
partsFound = partsFoundForPartitions(catName, dbName, tableName,
partNames, colNames, engine);
colStatsList =
- columnStatisticsObjForPartitions(catName, dbName, tableName,
partNames, colNames, engine, partsFound,
+ directSqlAggrStats.columnStatisticsObjForPartitions(catName, dbName,
tableName, partNames, colNames, engine, partsFound,
useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector,
enableKll);
}
LOG.debug("useDensityFunctionForNDVEstimation = " +
useDensityFunctionForNDVEstimation
@@ -1876,540 +1805,6 @@ public List<Long> run(List<String> inputPartNames)
throws MetaException {
return partsFound;
}
- private List<ColumnStatisticsObj> columnStatisticsObjForPartitions(
- final String catName, final String dbName, final String tableName, final
List<String> partNames,
- List<String> colNames, String engine, long partsFound, final boolean
useDensityFunctionForNDVEstimation,
- final double ndvTuner, final boolean enableBitVector, boolean enableKll)
throws MetaException {
- final boolean areAllPartsFound = (partsFound == partNames.size());
- return Batchable.runBatched(batchSize, colNames, new Batchable<String,
ColumnStatisticsObj>() {
- @Override
- public List<ColumnStatisticsObj> run(final List<String> inputColNames)
throws MetaException {
- return Batchable.runBatched(batchSize, partNames, new
Batchable<String, ColumnStatisticsObj>() {
- @Override
- public List<ColumnStatisticsObj> run(List<String> inputPartNames)
throws MetaException {
- return columnStatisticsObjForPartitionsBatch(catName, dbName,
tableName, inputPartNames,
- inputColNames, engine, areAllPartsFound,
useDensityFunctionForNDVEstimation, ndvTuner,
- enableBitVector, enableKll);
- }
- });
- }
- });
- }
-
- public List<ColStatsObjWithSourceInfo>
getColStatsForAllTablePartitions(String catName, String dbName,
- boolean enableBitVector, boolean enableKll) throws MetaException {
- String queryText = "select \"TBLS\".\"TBL_NAME\",
\"PARTITIONS\".\"PART_NAME\", "
- + getStatsList(enableBitVector, enableKll)
- + " from " + PART_COL_STATS
- + " inner join " + PARTITIONS + " on " + PART_COL_STATS +
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
- + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " +
TBLS + ".\"TBL_ID\""
- + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS +
".\"DB_ID\""
- + " where " + DBS + ".\"NAME\" = ? and " + DBS + ".\"CTLG_NAME\" = ?";
- long start = 0;
- long end = 0;
- boolean doTrace = LOG.isDebugEnabled();
- Object qResult = null;
- start = doTrace ? System.nanoTime() : 0;
- List<ColStatsObjWithSourceInfo> colStatsForDB = new
ArrayList<ColStatsObjWithSourceInfo>();
- try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
- qResult = executeWithArray(query.getInnerQuery(), new Object[] { dbName,
catName }, queryText);
- if (qResult == null) {
- return colStatsForDB;
- }
- end = doTrace ? System.nanoTime() : 0;
- MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
- List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
- for (Object[] row : list) {
- String tblName = (String) row[0];
- String partName = (String) row[1];
- ColumnStatisticsObj colStatObj = prepareCSObj(row, 2);
- colStatsForDB.add(new ColStatsObjWithSourceInfo(colStatObj, catName,
dbName, tblName, partName));
- Deadline.checkTimeout();
- }
- }
- return colStatsForDB;
- }
-
- /** Should be called with the list short enough to not trip up Oracle/etc. */
- private List<ColumnStatisticsObj>
columnStatisticsObjForPartitionsBatch(String catName, String dbName,
- String tableName, List<String> partNames, List<String> colNames, String
engine,
- boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation,
double ndvTuner,
- boolean enableBitVector, boolean enableKll)
- throws MetaException {
- if (enableBitVector || enableKll) {
- return aggrStatsUseJava(catName, dbName, tableName, partNames, colNames,
engine, areAllPartsFound,
- useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector,
enableKll);
- } else {
- return aggrStatsUseDB(catName, dbName, tableName, partNames, colNames,
engine, areAllPartsFound,
- useDensityFunctionForNDVEstimation, ndvTuner);
- }
- }
-
- private List<ColumnStatisticsObj> aggrStatsUseJava(String catName, String
dbName, String tableName,
- List<String> partNames, List<String> colNames, String engine, boolean
areAllPartsFound,
- boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean
enableBitVector,
- boolean enableKll) throws MetaException {
- // 1. get all the stats for colNames in partNames;
- List<ColumnStatistics> partStats =
- getPartitionStats(catName, dbName, tableName, partNames, colNames,
engine, enableBitVector, enableKll);
- // 2. use util function to aggr stats
- return MetaStoreServerUtils.aggrPartitionStats(partStats, catName, dbName,
tableName, partNames, colNames,
- areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner);
- }
-
- private List<ColumnStatisticsObj> aggrStatsUseDB(String catName, String
dbName,
- String tableName, List<String> partNames, List<String> colNames, String
engine,
- boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation,
double ndvTuner) throws MetaException {
- // TODO: all the extrapolation logic should be moved out of this class,
- // only mechanical data retrieval should remain here.
- String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "
- + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"),
min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
- + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)),
max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
- + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
- + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"),
sum(\"NUM_FALSES\"), "
- // The following data is used to compute a partitioned table's NDV
based
- // on partitions' NDV when useDensityFunctionForNDVEstimation = true.
Global NDVs cannot be
- // accurately derived from partition NDVs, because the domain of
column value two partitions
- // can overlap. If there is no overlap then global NDV is just the sum
- // of partition NDVs (UpperBound). But if there is some overlay then
- // global NDV can be anywhere between sum of partition NDVs (no
overlap)
- // and same as one of the partition NDV (domain of column value in all
other
- // partitions is subset of the domain value in one of the partition)
- // (LowerBound).But under uniform distribution, we can roughly
estimate the global
- // NDV by leveraging the min/max values.
- // And, we also guarantee that the estimation makes sense by comparing
it to the
- // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
- // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
- + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\"
as decimal)),"
- +
"avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
- + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
- + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + ""
- + " inner join " + PARTITIONS + " on " + PART_COL_STATS +
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
- + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " +
TBLS + ".\"TBL_ID\""
- + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS +
".\"DB_ID\""
- + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ?
and " + TBLS + ".\"TBL_NAME\" = ? ";
- String queryText = null;
- long start = 0;
- long end = 0;
-
- boolean doTrace = LOG.isDebugEnabled();
- ForwardQueryResult<?> fqr = null;
- // Check if the status of all the columns of all the partitions exists
- // Extrapolation is not needed.
- if (areAllPartsFound) {
- queryText = commonPrefix + " and \"COLUMN_NAME\" in (" +
makeParams(colNames.size()) + ")"
- + " and " + PARTITIONS + ".\"PART_NAME\" in (" +
makeParams(partNames.size()) + ")"
- + " and \"ENGINE\" = ? "
- + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
- start = doTrace ? System.nanoTime() : 0;
- try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
- Object qResult = executeWithArray(query.getInnerQuery(),
- prepareParams(catName, dbName, tableName, partNames, colNames,
- engine), queryText);
- if (qResult == null) {
- return Collections.emptyList();
- }
- end = doTrace ? System.nanoTime() : 0;
- MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
- List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
- List<ColumnStatisticsObj> colStats =
- new ArrayList<ColumnStatisticsObj>(list.size());
- for (Object[] row : list) {
- colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
- useDensityFunctionForNDVEstimation, ndvTuner));
- Deadline.checkTimeout();
- }
- return colStats;
- }
- } else {
- // Extrapolation is needed for some columns.
- // In this case, at least a column status for a partition is missing.
- // We need to extrapolate this partition based on the other partitions
- List<ColumnStatisticsObj> colStats = new
ArrayList<ColumnStatisticsObj>(colNames.size());
- queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\",
count(\"PART_COL_STATS\".\"PART_ID\") "
- + " from " + PART_COL_STATS
- + " inner join " + PARTITIONS + " on " + PART_COL_STATS +
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
- + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " +
TBLS + ".\"TBL_ID\""
- + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS +
".\"DB_ID\""
- + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ?
and " + TBLS + ".\"TBL_NAME\" = ? "
- + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" +
makeParams(colNames.size()) + ")"
- + " and " + PARTITIONS + ".\"PART_NAME\" in (" +
makeParams(partNames.size()) + ")"
- + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
- + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\", " +
PART_COL_STATS + ".\"COLUMN_TYPE\"";
- start = doTrace ? System.nanoTime() : 0;
- List<String> noExtraColumnNames = new ArrayList<String>();
- Map<String, String[]> extraColumnNameTypeParts = new HashMap<String,
String[]>();
- try(QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
- Object qResult = executeWithArray(query.getInnerQuery(),
- prepareParams(catName, dbName, tableName, partNames, colNames,
- engine), queryText);
- end = doTrace ? System.nanoTime() : 0;
- MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
- if (qResult == null) {
- return Collections.emptyList();
- }
-
- List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
- for (Object[] row : list) {
- String colName = (String) row[0];
- String colType = (String) row[1];
- // Extrapolation is not needed for this column if
- // count(\"PARTITION_NAME\")==partNames.size()
- // Or, extrapolation is not possible for this column if
- // count(\"PARTITION_NAME\")<2
- Long count = MetastoreDirectSqlUtils.extractSqlLong(row[2]);
- if (count == partNames.size() || count < 2) {
- noExtraColumnNames.add(colName);
- } else {
- extraColumnNameTypeParts.put(colName, new String[] {colType,
String.valueOf(count)});
- }
- Deadline.checkTimeout();
- }
- }
- // Extrapolation is not needed for columns noExtraColumnNames
- List<Object[]> list;
- if (noExtraColumnNames.size() != 0) {
- queryText = commonPrefix + " and \"COLUMN_NAME\" in ("
- + makeParams(noExtraColumnNames.size()) + ")" + " and
\"PARTITION_NAME\" in ("
- + makeParams(partNames.size()) + ")"
- + " and \"ENGINE\" = ? "
- + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
- start = doTrace ? System.nanoTime() : 0;
-
- try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
- Object qResult = executeWithArray(query.getInnerQuery(),
- prepareParams(catName, dbName, tableName, partNames,
noExtraColumnNames, engine), queryText);
- if (qResult == null) {
- return Collections.emptyList();
- }
- list = MetastoreDirectSqlUtils.ensureList(qResult);
- for (Object[] row : list) {
- colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
- useDensityFunctionForNDVEstimation, ndvTuner));
- Deadline.checkTimeout();
- }
- end = doTrace ? System.nanoTime() : 0;
- MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
- }
- }
- // Extrapolation is needed for extraColumnNames.
- // give a sequence number for all the partitions
- if (extraColumnNameTypeParts.size() != 0) {
- Map<String, Integer> indexMap = new HashMap<String, Integer>();
- for (int index = 0; index < partNames.size(); index++) {
- indexMap.put(partNames.get(index), index);
- }
- // get sum for all columns to reduce the number of queries
- Map<String, Map<Integer, Object>> sumMap = new HashMap<String,
Map<Integer, Object>>();
- queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"),
sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")"
- + " from " + PART_COL_STATS
- + " inner join " + PARTITIONS + " on " + PART_COL_STATS +
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
- + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " +
TBLS + ".\"TBL_ID\""
- + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS +
".\"DB_ID\""
- + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" =
? and " + TBLS + ".\"TBL_NAME\" = ? "
- + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" +
makeParams(extraColumnNameTypeParts.size()) + ")"
- + " and " + PARTITIONS + ".\"PART_NAME\" in (" +
makeParams(partNames.size()) + ")"
- + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
- + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\"";
- start = doTrace ? System.nanoTime() : 0;
- try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
- List<String> extraColumnNames = new ArrayList<String>();
- extraColumnNames.addAll(extraColumnNameTypeParts.keySet());
- Object qResult = executeWithArray(query.getInnerQuery(),
- prepareParams(catName, dbName, tableName, partNames,
- extraColumnNames, engine), queryText);
- if (qResult == null) {
- return Collections.emptyList();
- }
- list = MetastoreDirectSqlUtils.ensureList(qResult);
- // see the indexes for colstats in IExtrapolatePartStatus
- Integer[] sumIndex = new Integer[] {6, 10, 11, 15};
- for (Object[] row : list) {
- Map<Integer, Object> indexToObject = new HashMap<Integer,
Object>();
- for (int ind = 1; ind < row.length; ind++) {
- indexToObject.put(sumIndex[ind - 1], row[ind]);
- }
- // row[0] is the column name
- sumMap.put((String) row[0], indexToObject);
- Deadline.checkTimeout();
- }
- end = doTrace ? System.nanoTime() : 0;
- MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
- }
- for (Map.Entry<String, String[]> entry :
extraColumnNameTypeParts.entrySet()) {
- Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length
+ 2];
- String colName = entry.getKey();
- String colType = entry.getValue()[0];
- Long sumVal = Long.parseLong(entry.getValue()[1]);
- // fill in colname
- row[0] = colName;
- // fill in coltype
- row[1] = colType;
- // use linear extrapolation. more complicated one can be added in the
- // future.
- IExtrapolatePartStatus extrapolateMethod = new
LinearExtrapolatePartStatus();
- // fill in colstatus
- Integer[] index = null;
- boolean decimal = false;
- if (colType.toLowerCase().startsWith("decimal")) {
- index = IExtrapolatePartStatus.indexMaps.get("decimal");
- decimal = true;
- } else {
- index =
IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase());
- }
- // if the colType is not the known type, long, double, etc, then get
- // all index.
- if (index == null) {
- index = IExtrapolatePartStatus.indexMaps.get("default");
- }
- for (int colStatIndex : index) {
- String colStatName =
IExtrapolatePartStatus.colStatNames[colStatIndex];
- // if the aggregation type is sum, we do a scale-up
- if (IExtrapolatePartStatus.aggrTypes[colStatIndex] ==
IExtrapolatePartStatus.AggrType.Sum) {
- Object o = sumMap.get(colName).get(colStatIndex);
- if (o == null) {
- row[2 + colStatIndex] = null;
- } else {
- Long val = MetastoreDirectSqlUtils.extractSqlLong(o);
- row[2 + colStatIndex] = val / sumVal * (partNames.size());
- }
- } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] ==
IExtrapolatePartStatus.AggrType.Min
- || IExtrapolatePartStatus.aggrTypes[colStatIndex] ==
IExtrapolatePartStatus.AggrType.Max) {
- // if the aggregation type is min/max, we extrapolate from the
- // left/right borders
- if (!decimal) {
- queryText = "select \"" + colStatName + "\",\"PART_NAME\" from
" + PART_COL_STATS
- + " inner join " + PARTITIONS + " on " + PART_COL_STATS +
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
- + " inner join " + TBLS + " on " + PARTITIONS +
".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
- + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " +
DBS + ".\"DB_ID\""
- + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS +
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
- + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
- + " and " + PARTITIONS + ".\"PART_NAME\" in (" +
makeParams(partNames.size()) + ")"
- + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
- + " order by \"" + colStatName + "\"";
- } else {
- queryText = "select \"" + colStatName + "\",\"PART_NAME\" from
" + PART_COL_STATS
- + " inner join " + PARTITIONS + " on " + PART_COL_STATS +
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
- + " inner join " + TBLS + " on " + PARTITIONS +
".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
- + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " +
DBS + ".\"DB_ID\""
- + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS +
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
- + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
- + " and " + PARTITIONS + ".\"PART_NAME\" in (" +
makeParams(partNames.size()) + ")"
- + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
- + " order by cast(\"" + colStatName + "\" as decimal)";
- }
- start = doTrace ? System.nanoTime() : 0;
- try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
- Object qResult = executeWithArray(query.getInnerQuery(),
- prepareParams(catName, dbName, tableName, partNames,
Arrays.asList(colName), engine), queryText);
- if (qResult == null) {
- return Collections.emptyList();
- }
- fqr = (ForwardQueryResult<?>) qResult;
- Object[] min = (Object[]) (fqr.get(0));
- Object[] max = (Object[]) (fqr.get(fqr.size() - 1));
- end = doTrace ? System.nanoTime() : 0;
- MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start,
end);
- if (min[0] == null || max[0] == null) {
- row[2 + colStatIndex] = null;
- } else {
- row[2 + colStatIndex] = extrapolateMethod
- .extrapolate(min, max, colStatIndex, indexMap);
- }
- }
- } else {
- // if the aggregation type is avg, we use the average on the
existing ones.
- queryText = "select "
- +
"avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as
decimal)),"
- +
"avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
- + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")"
- + " from " + PART_COL_STATS + ""
- + " inner join " + PARTITIONS + " on " + PART_COL_STATS +
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
- + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\"
= " + TBLS + ".\"TBL_ID\""
- + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " +
DBS + ".\"DB_ID\""
- + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS +
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
- + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
- + " and " + PARTITIONS + ".\"PART_NAME\" in (" +
makeParams(partNames.size()) + ")"
- + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
- + " group by \"COLUMN_NAME\"";
- start = doTrace ? System.nanoTime() : 0;
- try(QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
- Object qResult = executeWithArray(query.getInnerQuery(),
- prepareParams(catName, dbName, tableName, partNames,
Arrays.asList(colName), engine), queryText);
- if (qResult == null) {
- return Collections.emptyList();
- }
- fqr = (ForwardQueryResult<?>) qResult;
- Object[] avg = (Object[]) (fqr.get(0));
- // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE",
- // "AVG_DECIMAL"
- row[2 + colStatIndex] = avg[colStatIndex - 12];
- end = doTrace ? System.nanoTime() : 0;
- MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start,
end);
- }
- }
- }
- colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
useDensityFunctionForNDVEstimation, ndvTuner));
- Deadline.checkTimeout();
- }
- }
- return colStats;
- }
- }
-
- private ColumnStatisticsObj prepareCSObj (Object[] row, int i) throws
MetaException {
- ColumnStatisticsData data = new ColumnStatisticsData();
- ColumnStatisticsObj cso = new ColumnStatisticsObj((String)row[i++],
(String)row[i++], data);
- Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh =
row[i++],
- declow = row[i++], dechigh = row[i++], nulls = row[i++], dist =
row[i++], bitVector = row[i++],
- histogram = row[i++], avglen = row[i++], maxlen = row[i++], trues =
row[i++], falses = row[i];
- StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data,
- llow, lhigh, dlow, dhigh, declow, dechigh, nulls, dist, bitVector,
histogram, avglen, maxlen, trues, falses);
- return cso;
- }
-
- private ColumnStatisticsObj prepareCSObjWithAdjustedNDV(Object[] row, int i,
- boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws
MetaException {
- ColumnStatisticsData data = new ColumnStatisticsData();
- ColumnStatisticsObj cso = new ColumnStatisticsObj((String) row[i++],
(String) row[i++], data);
- Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh =
row[i++], declow = row[i++],
- dechigh = row[i++], nulls = row[i++], dist = row[i++], avglen =
row[i++], maxlen = row[i++],
- trues = row[i++], falses = row[i++], avgLong = row[i++], avgDouble =
row[i++],
- avgDecimal = row[i++], sumDist = row[i++];
- StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, llow,
lhigh, dlow, dhigh,
- declow, dechigh, nulls, dist, avglen, maxlen, trues, falses, avgLong,
avgDouble,
- avgDecimal, sumDist, useDensityFunctionForNDVEstimation, ndvTuner);
- return cso;
- }
-
- private Object[] prepareParams(String catName, String dbName, String
tableName,
- List<String> partNames, List<String> colNames, String engine) throws
MetaException {
- Object[] params = new Object[colNames.size() + partNames.size() + 4];
- int paramI = 0;
- params[paramI++] = catName;
- params[paramI++] = dbName;
- params[paramI++] = tableName;
- for (String colName : colNames) {
- params[paramI++] = colName;
- }
- for (String partName : partNames) {
- params[paramI++] = partName;
- }
- params[paramI++] = engine;
-
- return params;
- }
-
- public List<ColumnStatistics> getPartitionStats(
- final String catName, final String dbName, final String tableName, final
List<String> partNames,
- List<String> colNames, String engine, boolean enableBitVector, boolean
enableKll) throws MetaException {
- if (colNames.isEmpty() || partNames.isEmpty()) {
- return Collections.emptyList();
- }
- final boolean doTrace = LOG.isDebugEnabled();
- final String queryText0 = "select \"PARTITIONS\".\"PART_NAME\", " +
getStatsList(enableBitVector, enableKll)
- + " from " + PART_COL_STATS
- + " inner join " + PARTITIONS + " on " + PART_COL_STATS +
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
- + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " +
TBLS + ".\"TBL_ID\""
- + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS +
".\"DB_ID\""
- + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ?
and " + TBLS + ".\"TBL_NAME\" = ? "
- + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (%1$s)"
- + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)"
- + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
- + " order by " + PARTITIONS + ".\"PART_NAME\"";
- Batchable<String, Object[]> b = new Batchable<String, Object[]>() {
- @Override
- public List<Object[]> run(final List<String> inputColNames) throws
MetaException {
- Batchable<String, Object[]> b2 = new Batchable<String, Object[]>() {
- @Override
- public List<Object[]> run(List<String> inputPartNames) throws
MetaException {
- String queryText = String.format(queryText0,
- makeParams(inputColNames.size()),
makeParams(inputPartNames.size()));
- long start = doTrace ? System.nanoTime() : 0;
- Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
- try {
- Object qResult = executeWithArray(query, prepareParams(
- catName, dbName, tableName, inputPartNames, inputColNames,
engine), queryText);
- MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0, start,
(doTrace ? System.nanoTime() : 0));
- if (qResult == null) {
- return Collections.emptyList();
- }
- return MetastoreDirectSqlUtils.ensureList(qResult);
- } finally {
- addQueryAfterUse(query);
- }
- }
- };
- try {
- return Batchable.runBatched(batchSize, partNames, b2);
- } finally {
- addQueryAfterUse(b2);
- }
- }
- };
-
- List<ColumnStatistics> result = new
ArrayList<ColumnStatistics>(partNames.size());
- String lastPartName = null;
- int from = 0;
- try {
- List<Object[]> list = Batchable.runBatched(batchSize, colNames, b);
- for (int i = 0; i <= list.size(); ++i) {
- boolean isLast = i == list.size();
- String partName = isLast ? null : (String) list.get(i)[0];
- if (!isLast && partName.equals(lastPartName)) {
- continue;
- } else if (from != i) {
- ColumnStatisticsDesc csd =
- new ColumnStatisticsDesc(false, dbName, tableName);
- csd.setCatName(catName);
- csd.setPartName(lastPartName);
- result.add(makeColumnStats(list.subList(from, i), csd, 1, engine));
- }
- lastPartName = partName;
- from = i;
- Deadline.checkTimeout();
- }
- } finally {
- b.closeAllQueries();
- }
- return result;
- }
-
- /** The common query part for table and partition stats */
- private String getStatsList(boolean enableBitVector, boolean enableKll) {
- return "\"COLUMN_NAME\", \"COLUMN_TYPE\", \"LONG_LOW_VALUE\",
\"LONG_HIGH_VALUE\", "
- + "\"DOUBLE_LOW_VALUE\", \"DOUBLE_HIGH_VALUE\",
\"BIG_DECIMAL_LOW_VALUE\", "
- + "\"BIG_DECIMAL_HIGH_VALUE\", \"NUM_NULLS\", \"NUM_DISTINCTS\", "
- + (enableBitVector ? "\"BIT_VECTOR\", " : "\'\', ")
- + (enableKll ? "\"HISTOGRAM\", " : "\'\', ")
- + "\"AVG_COL_LEN\", \"MAX_COL_LEN\", \"NUM_TRUES\", \"NUM_FALSES\",
\"LAST_ANALYZED\" ";
- }
-
- private ColumnStatistics makeColumnStats(
- List<Object[]> list, ColumnStatisticsDesc csd, int offset, String
engine) throws MetaException {
- ColumnStatistics result = new ColumnStatistics();
- result.setStatsDesc(csd);
- List<ColumnStatisticsObj> csos = new
ArrayList<ColumnStatisticsObj>(list.size());
- for (Object[] row : list) {
- // LastAnalyzed is stored per column but thrift has it per several;
- // get the lowest for now as nobody actually uses this field.
- Object laObj = row[offset + 16]; // 16 is the offset of "last analyzed"
field
- if (laObj != null && (!csd.isSetLastAnalyzed() || csd.getLastAnalyzed()
> MetastoreDirectSqlUtils
- .extractSqlLong(laObj))) {
- csd.setLastAnalyzed(MetastoreDirectSqlUtils.extractSqlLong(laObj));
- }
- csos.add(prepareCSObj(row, offset));
- Deadline.checkTimeout();
- }
- result.setStatsObj(csos);
- result.setEngine(engine);
- return result;
- }
-
- private String makeParams(int size) {
- // W/ size 0, query will fail, but at least we'd get to see the query in
debug output.
- return (size == 0) ? "" : repeat(",?", size).substring(1);
- }
@SuppressWarnings("unchecked")
private <T> T executeWithArray(Query query, Object[] params, String sql)
throws MetaException {
@@ -2838,9 +2233,10 @@ public List<SQLCheckConstraint>
getCheckConstraints(String catName, String db_na
/**
* Drop partitions by using direct SQL queries.
- * @param catName Metastore catalog name.
- * @param dbName Metastore db name.
- * @param tblName Metastore table name.
+ *
+ * @param catName Metastore catalog name.
+ * @param dbName Metastore db name.
+ * @param tblName Metastore table name.
* @param partNames Partition names to get.
* @return List of partitions.
*/
@@ -2923,6 +2319,7 @@ public List<Void> run(List<Long> input) throws Exception {
/**
* Drops Partition-s. Should be called with the list short enough to not
trip up Oracle/etc.
+ *
* @param partitionIdList The partition identifiers to drop
* @throws MetaException If there is an SQL exception during the execution
it converted to
* MetaException
@@ -3007,6 +2404,7 @@ private void dropPartitionsByPartitionIds(List<Long>
partitionIdList) throws Met
/**
* Drops SD-s. Should be called with the list short enough to not trip up
Oracle/etc.
+ *
* @param storageDescriptorIdList The storage descriptor identifiers to drop
* @throws MetaException If there is an SQL exception during the execution
it converted to
* MetaException
@@ -3095,6 +2493,7 @@ private void dropStorageDescriptors(List<Object>
storageDescriptorIdList) throws
/**
* Drops Serde-s. Should be called with the list short enough to not trip up
Oracle/etc.
+ *
* @param serdeIdList The serde identifiers to drop
* @throws MetaException If there is an SQL exception during the execution
it converted to
* MetaException
@@ -3124,6 +2523,7 @@ private void dropSerdes(List<Object> serdeIdList) throws
MetaException {
/**
* Checks if the column descriptors still has references for other SD-s. If
not, then removes
* them. Should be called with the list short enough to not trip up
Oracle/etc.
+ *
* @param columnDescriptorIdList The column identifiers
* @throws MetaException If there is an SQL exception during the execution
it converted to
* MetaException
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
index 45e89ab40df..19b92f5177c 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
@@ -42,6 +42,7 @@
import javax.jdo.PersistenceManager;
import javax.jdo.Query;
import java.math.BigDecimal;
+import java.math.MathContext;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.SQLException;
@@ -53,6 +54,8 @@
import java.util.Map;
import java.util.TreeMap;
+import static org.apache.commons.lang3.StringUtils.repeat;
+
/**
* Helper utilities used by DirectSQL code in HiveMetastore.
*/
@@ -646,4 +649,72 @@ public static void throwMetaOrRuntimeException(Exception
e) throws MetaException
throw new RuntimeException(e);
}
}
+
+ public static Object[] prepareParams(String catName, String dbName, String
tableName,
+ List<String> partNames, List<String>
colNames, String engine) {
+ Object[] params = new Object[colNames.size() + partNames.size() + 4];
+ int paramI = 0;
+ params[paramI++] = catName;
+ params[paramI++] = dbName;
+ params[paramI++] = tableName;
+ for (String colName : colNames) {
+ params[paramI++] = colName;
+ }
+ for (String partName : partNames) {
+ params[paramI++] = partName;
+ }
+ params[paramI] = engine;
+
+ return params;
+ }
+
+ public static String getFullyQualifiedName(String schema, String tblName) {
+ return ((schema == null || schema.isEmpty()) ? "" : "\"" + schema +
"\".\"")
+ + "\"" + tblName + "\"";
+ }
+
+ public static String makeParams(int size) {
+ // W/ size 0, query will fail, but at least we'd get to see the query in
debug output.
+ return (size == 0) ? "" : repeat(",?", size).substring(1);
+ }
+
+ public static Object sum(Object first, Object second) {
+ if (first == null) {
+ return second;
+ }
+ if (second == null) {
+ return first;
+ }
+ return (new BigDecimal(first.toString())).add(new
BigDecimal(second.toString()));
+ }
+
+ public static Object divide(Object dividend, Object divisor) {
+ if (dividend == null || divisor == null) {
+ return null;
+ }
+
+ BigDecimal divisorVal = new BigDecimal(divisor.toString());
+ if (divisorVal.equals(new BigDecimal("0"))) return null;
+ return (new BigDecimal(dividend.toString())).divide(divisorVal,
MathContext.DECIMAL64);
+ }
+
+ public static Object min(Object first, Object second) {
+ if (first == null) {
+ return second;
+ }
+ if (second == null) {
+ return first;
+ }
+ return (new BigDecimal(first.toString())).min(new
BigDecimal(second.toString()));
+ }
+
+ public static Object max(Object first, Object second) {
+ if (first == null) {
+ return second;
+ }
+ if (second == null) {
+ return first;
+ }
+ return (new BigDecimal(first.toString())).max(new
BigDecimal(second.toString()));
+ }
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 3820c8c500e..8a2c94a6443 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -327,6 +327,7 @@ private enum TXN_STATUS {
protected PersistenceManager pm = null;
protected SQLGenerator sqlGenerator = null;
private MetaStoreDirectSql directSql = null;
+ private DirectSqlAggrStats directSqlAggrStats;
protected DatabaseProduct dbType = null;
private PartitionExpressionProxy expressionProxy = null;
protected Configuration conf;
@@ -365,6 +366,7 @@ public void setConf(Configuration conf) {
// most recent instance of the pmf
pm = null;
directSql = null;
+ directSqlAggrStats = null;
expressionProxy = null;
openTrasactionCalls = 0;
currentTransaction = null;
@@ -407,6 +409,7 @@ private void initialize() {
String schema =
PersistenceManagerProvider.getProperty("javax.jdo.mapping.Schema");
schema = org.apache.commons.lang3.StringUtils.defaultIfBlank(schema,
null);
directSql = new MetaStoreDirectSql(pm, conf, schema);
+ directSqlAggrStats = new DirectSqlAggrStats(pm,conf,schema);
}
}
if (propertyStore == null) {
@@ -9569,7 +9572,7 @@ protected ColumnStatistics
getTableColumnStatisticsInternal(
normalizeIdentifier(tableName), allowSql, allowJdo, null) {
@Override
protected ColumnStatistics getSqlResult(GetHelper<ColumnStatistics> ctx)
throws MetaException {
- return directSql.getTableStats(catName, dbName, tblName, colNames,
engine, enableBitVector, enableKll);
+ return directSqlAggrStats.getTableStats(catName, dbName, tblName,
colNames, engine, enableBitVector, enableKll);
}
@Override
@@ -9693,7 +9696,7 @@ protected List<ColumnStatistics>
getPartitionColumnStatisticsInternal(
@Override
protected List<ColumnStatistics> getSqlResult(
GetHelper<List<ColumnStatistics>> ctx) throws MetaException {
- return directSql.getPartitionStats(
+ return directSqlAggrStats.getPartitionStats(
catName, dbName, tblName, partNames, colNames, engine,
enableBitVector, enableKll);
}
@Override
@@ -9811,7 +9814,7 @@ public
List<MetaStoreServerUtils.ColStatsObjWithSourceInfo> getPartitionColStats
@Override
protected List<MetaStoreServerUtils.ColStatsObjWithSourceInfo>
getSqlResult(
GetHelper<List<MetaStoreServerUtils.ColStatsObjWithSourceInfo>> ctx)
throws MetaException {
- return directSql.getColStatsForAllTablePartitions(catName, dbName,
enableBitVector, enableKll);
+ return directSqlAggrStats.getColStatsForAllTablePartitions(catName,
dbName, enableBitVector, enableKll);
}
@Override
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
index f755aec6b19..0d22719dd6a 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
@@ -106,6 +106,8 @@
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
@@ -776,7 +778,7 @@ public void testConcurrentDropPartitions() throws
MetaException, InvalidObjectEx
@Test
public void testDirectSQLDropPartitionsCacheInSession()
throws Exception {
- createPartitionedTable(false, false);
+ createPartitionedTable(false, false, new HashSet<>());
// query the partitions with JDO
List<Partition> partitions;
try(AutoCloseable c = deadline()) {
@@ -807,7 +809,7 @@ public void testDirectSQLDropPartitionsCacheCrossSession()
ObjectStore objectStore2 = new ObjectStore();
objectStore2.setConf(conf);
- createPartitionedTable(false, false);
+ createPartitionedTable(false, false, new HashSet<>());
GetPartitionsArgs args = new
GetPartitionsArgs.GetPartitionsArgsBuilder().max(10).build();
// query the partitions with JDO in the 1st session
List<Partition> partitions;
@@ -842,7 +844,7 @@ public void testDirectSQLDropPartitionsCacheCrossSession()
@Test
public void testDirectSQLDropPartitionsCleanup() throws Exception {
- createPartitionedTable(true, true);
+ createPartitionedTable(true, true, new HashSet<>());
// Check, that every table in the expected state before the drop
checkBackendTableSize("PARTITIONS", 3);
@@ -883,7 +885,7 @@ public void testDirectSQLDropPartitionsCleanup() throws
Exception {
@Test
public void testDirectSQLCDsCleanup() throws Exception {
- createPartitionedTable(true, true);
+ createPartitionedTable(true, true, new HashSet<>());
// Checks there is only one CD before altering partition
checkBackendTableSize("PARTITIONS", 3);
checkBackendTableSize("CDS", 1);
@@ -915,7 +917,7 @@ public void testDirectSQLCDsCleanup() throws Exception {
@Test
public void testTableStatisticsOps() throws Exception {
- createPartitionedTable(true, true);
+ createPartitionedTable(true, true, new HashSet<>());
List<ColumnStatistics> tabColStats;
try (AutoCloseable c = deadline()) {
@@ -958,13 +960,13 @@ public void testTableStatisticsOps() throws Exception {
@Test
public void testDeleteTableColumnStatisticsWhenEngineHasSpecialCharacter()
throws Exception {
- createPartitionedTable(true, true);
+ createPartitionedTable(true, true, new HashSet<>());
objectStore.deleteTableColumnStatistics(DEFAULT_CATALOG_NAME, DB1, TABLE1,
Arrays.asList("test_col1"), "special '");
}
@Test
public void testPartitionStatisticsOps() throws Exception {
- createPartitionedTable(true, true);
+ createPartitionedTable(true, true, new HashSet<>());
List<List<ColumnStatistics>> stat;
try (AutoCloseable c = deadline()) {
@@ -1015,35 +1017,93 @@ public void testPartitionStatisticsOps() throws
Exception {
@Test
public void
testDeletePartitionColumnStatisticsWhenEngineHasSpecialCharacter() throws
Exception {
- createPartitionedTable(true, true);
+ createPartitionedTable(true, true, new HashSet<>());
objectStore.deletePartitionColumnStatistics(DEFAULT_CATALOG_NAME, DB1,
TABLE1,
List.of("test_part_col=a2"), null, "special '");
}
-
- @Test
- public void testAggrStatsUseDB() throws Exception {
+ private void setAggrConf(boolean enableBitVector, boolean enableKll, int
batchSize) {
Configuration conf2 = MetastoreConf.newMetastoreConf(conf);
- MetastoreConf.setBoolVar(conf2, ConfVars.STATS_FETCH_BITVECTOR, false);
- MetastoreConf.setBoolVar(conf2, ConfVars.STATS_FETCH_KLL, false);
+ MetastoreConf.setBoolVar(conf2, ConfVars.STATS_FETCH_BITVECTOR,
enableBitVector);
+ MetastoreConf.setBoolVar(conf2, ConfVars.STATS_FETCH_KLL, enableKll);
+ MetastoreConf.setLongVar(conf2, ConfVars.DIRECT_SQL_PARTITION_BATCH_SIZE,
batchSize);
objectStore.setConf(conf2);
+ }
- createPartitionedTable(true, true);
-
- AggrStats aggrStats;
+ private AggrStats runStatsAggregation() throws Exception {
try (AutoCloseable c = deadline()) {
- aggrStats = objectStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, DB1,
TABLE1,
- Arrays.asList("test_part_col=a0", "test_part_col=a1",
"test_part_col=a2"),
- Collections.singletonList("test_part_col"), ENGINE);
+ return objectStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, DB1, TABLE1,
+ Arrays.asList("test_part_col=a0", "test_part_col=a1",
"test_part_col=a2"),
+ Collections.singletonList("test_part_col"), ENGINE);
}
- List<ColumnStatisticsObj> stats = aggrStats.getColStats();
- Assert.assertEquals(1, stats.size());
+ }
+
+ private void assertAggrStats(AggrStats aggrStats, ColumnStatisticsData
computedStats) {
+ Assert.assertEquals(1, aggrStats.getColStats().size());
Assert.assertEquals(3, aggrStats.getPartsFound());
+ ColumnStatisticsData expectedStats = new
ColStatsBuilder<>(long.class).numNulls(3).numDVs(2)
+ .low(3L).high(4L).build();
+ assertEqualStatistics(expectedStats, computedStats);
+ }
+
+ private void statsAggrResourceCleanup()
+ throws Exception {
+ try (AutoCloseable c = deadline()) {
+ objectStore.dropPartitionsInternal(DEFAULT_CATALOG_NAME, DB1, TABLE1,
+ Arrays.asList("test_part_col=a0", "test_part_col=a1",
"test_part_col=a2"), true, true);
+ objectStore.dropTable(DEFAULT_CATALOG_NAME, DB1, TABLE1);
+ objectStore.dropDatabase(DEFAULT_CATALOG_NAME, DB1);
+ }
+ }
+
+ @Test
+ public void testStatsAggrWithKll() throws Exception {
+ setAggrConf(false, true, 2);
+ createPartitionedTable(true, true, new HashSet<>());
+ AggrStats aggrStats = runStatsAggregation();
+ ColumnStatisticsData computedStats =
aggrStats.getColStats().get(0).getStatsData();
+ computedStats = new ColStatsBuilder<>(long.class)
+
.numNulls(computedStats.getLongStats().getNumNulls()).numDVs(computedStats.getLongStats().getNumDVs())
+
.low(computedStats.getLongStats().getLowValue()).high(computedStats.getLongStats().getHighValue()).build();
+ assertAggrStats(aggrStats, computedStats);
+ statsAggrResourceCleanup();
+ }
+ @Test
+ public void testStatsAggrWithBitVector() throws Exception {
+ setAggrConf(true, false, 2);
+ createPartitionedTable(true, true, new HashSet<>());
+ AggrStats aggrStats = runStatsAggregation();
ColumnStatisticsData computedStats =
aggrStats.getColStats().get(0).getStatsData();
+ computedStats = new ColStatsBuilder<>(long.class)
+
.numNulls(computedStats.getLongStats().getNumNulls()).numDVs(computedStats.getLongStats().getNumDVs())
+
.low(computedStats.getLongStats().getLowValue()).high(computedStats.getLongStats().getHighValue()).build();
+ assertAggrStats(aggrStats, computedStats);
+ statsAggrResourceCleanup();
+ }
+
+ @Test
+ public void testStatsAggrWithBackendDB() throws Exception {
+ setAggrConf(false, false, 2);
+ createPartitionedTable(true, true, new HashSet<>());
+ AggrStats aggrStats = runStatsAggregation();
+ ColumnStatisticsData computedStats =
aggrStats.getColStats().get(0).getStatsData();
+ assertAggrStats(aggrStats, computedStats);
+ statsAggrResourceCleanup();
+ }
+
+ @Test
+ public void testMissingPartsStatsAggrWithBackendDB() throws Exception {
+ setAggrConf(false, false, 2);
+ createPartitionedTable(true, true, new
HashSet<>(Collections.singletonList(1)));
+ AggrStats aggrStats = runStatsAggregation();
+ ColumnStatisticsData computedStats =
aggrStats.getColStats().get(0).getStatsData();
+ Assert.assertEquals(1, aggrStats.getColStats().size());
+ Assert.assertEquals(2, aggrStats.getPartsFound());
ColumnStatisticsData expectedStats = new
ColStatsBuilder<>(long.class).numNulls(3).numDVs(2)
- .low(3L).high(4L).build();
+ .low(3L).high(4L).build();
assertEqualStatistics(expectedStats, computedStats);
+ statsAggrResourceCleanup();
}
/**
@@ -1051,7 +1111,7 @@ public void testAggrStatsUseDB() throws Exception {
* @param withPrivileges Should we create privileges as well
* @param withStatistics Should we create statitics as well
*/
- private void createPartitionedTable(boolean withPrivileges, boolean
withStatistics)
+ private void createPartitionedTable(boolean withPrivileges, boolean
withStatistics, Set<Integer> statsMissingIndices)
throws Exception {
Database db1 = new DatabaseBuilder()
.setName(DB1)
@@ -1115,6 +1175,9 @@ private void createPartitionedTable(boolean
withPrivileges, boolean withStatisti
}
if (withStatistics) {
+ if(statsMissingIndices.contains(i)){
+ continue;
+ }
ColumnStatistics stats = new ColumnStatistics();
ColumnStatisticsDesc desc = new ColumnStatisticsDesc();
desc.setCatName(tbl1.getCatName());
@@ -1802,7 +1865,7 @@ public void testListPackage() throws Exception {
@Test
public void testSavePoint() throws Exception {
List<String> partNames = Arrays.asList("test_part_col=a0",
"test_part_col=a1", "test_part_col=a2");
- createPartitionedTable(true, false);
+ createPartitionedTable(true, false, new HashSet<>());
Assert.assertEquals(3, objectStore.getPartitionCount());
objectStore.openTransaction();