ramitg254 commented on code in PR #6089:
URL: https://github.com/apache/hive/pull/6089#discussion_r2708622327


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java:
##########
@@ -1957,307 +2010,224 @@ private List<ColumnStatisticsObj> 
aggrStatsUseJava(String catName, String dbName
         areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner);
   }
 
-  private List<ColumnStatisticsObj> aggrStatsUseDB(String catName, String 
dbName,
-      String tableName, List<String> partNames, List<String> colNames, String 
engine,
-      boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, 
double ndvTuner) throws MetaException {
+  private List<ColumnStatisticsObj> aggrStatsUseDB(String catName, String 
dbName, String tableName,
+                                                   List<String> partNames, 
List<String> colNames, String engine,
+                                                   boolean 
useDensityFunctionForNDVEstimation, double ndvTuner)
+          throws MetaException {
     // TODO: all the extrapolation logic should be moved out of this class,
     // only mechanical data retrieval should remain here.
-    String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "
-        + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), 
min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
-        + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), 
max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
-        + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
-        + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), 
sum(\"NUM_FALSES\"), "
-        // The following data is used to compute a partitioned table's NDV 
based
-        // on partitions' NDV when useDensityFunctionForNDVEstimation = true. 
Global NDVs cannot be
-        // accurately derived from partition NDVs, because the domain of 
column value two partitions
-        // can overlap. If there is no overlap then global NDV is just the sum
-        // of partition NDVs (UpperBound). But if there is some overlay then
-        // global NDV can be anywhere between sum of partition NDVs (no 
overlap)
-        // and same as one of the partition NDV (domain of column value in all 
other
-        // partitions is subset of the domain value in one of the partition)
-        // (LowerBound).But under uniform distribution, we can roughly 
estimate the global
-        // NDV by leveraging the min/max values.
-        // And, we also guarantee that the estimation makes sense by comparing 
it to the
-        // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
-        // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
-        + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" 
as decimal)),"
-        + 
"avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
-        + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
-        + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + ""
-        + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-        + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
-        + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
-        + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? 
and " + TBLS + ".\"TBL_NAME\" = ? ";
-    String queryText = null;
-    long start = 0;
-    long end = 0;
+    String queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "
+            + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), 
min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
+            + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), 
max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
+            + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
+            + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), 
sum(\"NUM_FALSES\"), "
+            // The following data is used to compute a partitioned table's NDV 
based
+            // on partitions' NDV when useDensityFunctionForNDVEstimation = 
true. Global NDVs cannot be
+            // accurately derived from partition NDVs, because the domain of 
column value two partitions
+            // can overlap. If there is no overlap then global NDV is just the 
sum
+            // of partition NDVs (UpperBound). But if there is some overlay 
then
+            // global NDV can be anywhere between sum of partition NDVs (no 
overlap)
+            // and same as one of the partition NDV (domain of column value in 
all other
+            // partitions is subset of the domain value in one of the 
partition)
+            // (LowerBound).But under uniform distribution, we can roughly 
estimate the global
+            // NDV by leveraging the min/max values.
+            // And, we also guarantee that the estimation makes sense by 
comparing it to the
+            // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
+            // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
+            + 
"sum((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as 
decimal)),"
+            + 
"sum((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+            + "sum((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
+            + "count(1),"
+            + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + ""
+            + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+            + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
+            + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
+            + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = 
? and " + TBLS + ".\"TBL_NAME\" = ? "
+            + " and \"COLUMN_NAME\" in (%1$s)" + " and " + PARTITIONS + 
".\"PART_NAME\" in (%2$s)"
+            + " and \"ENGINE\" = ? " + " group by \"COLUMN_NAME\", 
\"COLUMN_TYPE\"";
 
     boolean doTrace = LOG.isDebugEnabled();
-    ForwardQueryResult<?> fqr = null;
-    // Check if the status of all the columns of all the partitions exists
-    // Extrapolation is not needed.
-    if (areAllPartsFound) {
-      queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + 
makeParams(colNames.size()) + ")"
-          + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-          + " and \"ENGINE\" = ? "
-          + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
-      start = doTrace ? System.nanoTime() : 0;
-      try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-        Object qResult = executeWithArray(query.getInnerQuery(),
-            prepareParams(catName, dbName, tableName, partNames, colNames,
-                engine), queryText);
-        if (qResult == null) {
-          return Collections.emptyList();
-        }
-        end = doTrace ? System.nanoTime() : 0;
-        MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
-        List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
-        List<ColumnStatisticsObj> colStats =
-            new ArrayList<ColumnStatisticsObj>(list.size());
-        for (Object[] row : list) {
-          colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
-              useDensityFunctionForNDVEstimation, ndvTuner));
-          Deadline.checkTimeout();
-        }
-        return colStats;
-      }
-    } else {
-      // Extrapolation is needed for some columns.
-      // In this case, at least a column status for a partition is missing.
-      // We need to extrapolate this partition based on the other partitions
-      List<ColumnStatisticsObj> colStats = new 
ArrayList<ColumnStatisticsObj>(colNames.size());
-      queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", 
count(\"PART_COL_STATS\".\"PART_ID\") "
-          + " from " + PART_COL_STATS
-          + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-          + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
-          + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
-          + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? 
and " + TBLS + ".\"TBL_NAME\" = ? "
-          + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + 
makeParams(colNames.size()) + ")"
-          + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-          + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-          + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\", " + 
PART_COL_STATS + ".\"COLUMN_TYPE\"";
-      start = doTrace ? System.nanoTime() : 0;
-      List<String> noExtraColumnNames = new ArrayList<String>();
-      Map<String, String[]> extraColumnNameTypeParts = new HashMap<String, 
String[]>();
-      try(QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-        Object qResult = executeWithArray(query.getInnerQuery(),
-            prepareParams(catName, dbName, tableName, partNames, colNames,
-                engine), queryText);
-        end = doTrace ? System.nanoTime() : 0;
-        MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
-        if (qResult == null) {
-          return Collections.emptyList();
-        }
 
-        List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
-        for (Object[] row : list) {
-          String colName = (String) row[0];
-          String colType = (String) row[1];
-          // Extrapolation is not needed for this column if
-          // count(\"PARTITION_NAME\")==partNames.size()
-          // Or, extrapolation is not possible for this column if
-          // count(\"PARTITION_NAME\")<2
-          Long count = MetastoreDirectSqlUtils.extractSqlLong(row[2]);
-          if (count == partNames.size() || count < 2) {
-            noExtraColumnNames.add(colName);
-          } else {
-            extraColumnNameTypeParts.put(colName, new String[] {colType, 
String.valueOf(count)});
-          }
-          Deadline.checkTimeout();
-        }
-      }
-      // Extrapolation is not needed for columns noExtraColumnNames
-      List<Object[]> list;
-      if (noExtraColumnNames.size() != 0) {
-        queryText = commonPrefix + " and \"COLUMN_NAME\" in ("
-            + makeParams(noExtraColumnNames.size()) + ")" + " and 
\"PARTITION_NAME\" in ("
-            + makeParams(partNames.size()) + ")"
-            + " and \"ENGINE\" = ? "
-            + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
-        start = doTrace ? System.nanoTime() : 0;
-
-        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-          Object qResult = executeWithArray(query.getInnerQuery(),
-              prepareParams(catName, dbName, tableName, partNames, 
noExtraColumnNames, engine), queryText);
-          if (qResult == null) {
-            return Collections.emptyList();
-          }
-          list = MetastoreDirectSqlUtils.ensureList(qResult);
-          for (Object[] row : list) {
-            colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
-                useDensityFunctionForNDVEstimation, ndvTuner));
-            Deadline.checkTimeout();
-          }
-          end = doTrace ? System.nanoTime() : 0;
-          MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
+    List<ColumnStatisticsObj> colStats = new ArrayList<>(colNames.size());
+    List<Object[]> partialStatsRows = new ArrayList<>(colNames.size());
+    columnWiseStatsMerger(queryText, catName, dbName, tableName,
+            colNames, partNames, colStats, partialStatsRows,
+            engine, useDensityFunctionForNDVEstimation, ndvTuner, doTrace);
+
+    // Extrapolation is needed for partialStatsRows.
+    if (partialStatsRows.size() != 0) {
+      Map<String, Integer> indexMap = new HashMap<String, Integer>();
+      for (int index = 0; index < partNames.size(); index++) {
+        indexMap.put(partNames.get(index), index);
+      }
+
+      for (Object[] row : partialStatsRows) {
+        String colName = row[COLNAME.idx()].toString();
+        String colType = row[COLTYPE.idx()].toString();
+        BigDecimal countVal = new BigDecimal(row[COUNT_ROWS.idx()].toString());
+
+        // use linear extrapolation. more complicated one can be added in the
+        // future.
+        IExtrapolatePartStatus extrapolateMethod = new 
LinearExtrapolatePartStatus();
+        // fill in colstatus
+        Integer[] index;
+        boolean decimal = false;
+        if (colType.toLowerCase().startsWith("decimal")) {
+          index = IExtrapolatePartStatus.indexMaps.get("decimal");
+          decimal = true;
+        } else {
+          index = IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase());
         }
-      }
-      // Extrapolation is needed for extraColumnNames.
-      // give a sequence number for all the partitions
-      if (extraColumnNameTypeParts.size() != 0) {
-        Map<String, Integer> indexMap = new HashMap<String, Integer>();
-        for (int index = 0; index < partNames.size(); index++) {
-          indexMap.put(partNames.get(index), index);
+        // if the colType is not the known type, long, double, etc, then get
+        // all index.
+        if (index == null) {
+          index = IExtrapolatePartStatus.indexMaps.get("default");
         }
-        // get sum for all columns to reduce the number of queries
-        Map<String, Map<Integer, Object>> sumMap = new HashMap<String, 
Map<Integer, Object>>();
-        queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), 
sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")"
-            + " from " + PART_COL_STATS
-            + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-            + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + 
TBLS + ".\"TBL_ID\""
-            + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
-            + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = 
? and " + TBLS + ".\"TBL_NAME\" = ? "
-            + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + 
makeParams(extraColumnNameTypeParts.size()) + ")"
-            + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-            + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-            + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\"";
-        start = doTrace ? System.nanoTime() : 0;
-        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-          List<String> extraColumnNames = new ArrayList<String>();
-          extraColumnNames.addAll(extraColumnNameTypeParts.keySet());
-          Object qResult = executeWithArray(query.getInnerQuery(),
-              prepareParams(catName, dbName, tableName, partNames,
-                  extraColumnNames, engine), queryText);
-          if (qResult == null) {
-            return Collections.emptyList();
-          }
-          list = MetastoreDirectSqlUtils.ensureList(qResult);
-          // see the indexes for colstats in IExtrapolatePartStatus
-          Integer[] sumIndex = new Integer[] {6, 10, 11, 15};
-          for (Object[] row : list) {
-            Map<Integer, Object> indexToObject = new HashMap<Integer, 
Object>();
-            for (int ind = 1; ind < row.length; ind++) {
-              indexToObject.put(sumIndex[ind - 1], row[ind]);
+
+        for (int colStatIndex : index) {
+          String colStatName = 
IExtrapolatePartStatus.colStatNames[colStatIndex];
+          // if the aggregation type is sum, we do a scale-up
+          if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Sum) {
+            // +3 only for the case of SUM_NUM_DISTINCTS which is after count 
rows index
+            int rowIndex = (colStatIndex == 15) ? colStatIndex + 3 : 
colStatIndex + 2;
+            if (row[rowIndex] != null) {
+              Long val = MetastoreDirectSqlUtils.extractSqlLong(row[rowIndex]);
+              row[rowIndex] = val / countVal.longValue() * (partNames.size());
             }
-            // row[0] is the column name
-            sumMap.put((String) row[0], indexToObject);
-            Deadline.checkTimeout();
-          }
-          end = doTrace ? System.nanoTime() : 0;
-          MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
-        }
-        for (Map.Entry<String, String[]> entry : 
extraColumnNameTypeParts.entrySet()) {
-          Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length 
+ 2];
-          String colName = entry.getKey();
-          String colType = entry.getValue()[0];
-          Long sumVal = Long.parseLong(entry.getValue()[1]);
-          // fill in colname
-          row[0] = colName;
-          // fill in coltype
-          row[1] = colType;
-          // use linear extrapolation. more complicated one can be added in the
-          // future.
-          IExtrapolatePartStatus extrapolateMethod = new 
LinearExtrapolatePartStatus();
-          // fill in colstatus
-          Integer[] index = null;
-          boolean decimal = false;
-          if (colType.toLowerCase().startsWith("decimal")) {
-            index = IExtrapolatePartStatus.indexMaps.get("decimal");
-            decimal = true;
-          } else {
-            index = 
IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase());
-          }
-          // if the colType is not the known type, long, double, etc, then get
-          // all index.
-          if (index == null) {
-            index = IExtrapolatePartStatus.indexMaps.get("default");
-          }
-          for (int colStatIndex : index) {
-            String colStatName = 
IExtrapolatePartStatus.colStatNames[colStatIndex];
-            // if the aggregation type is sum, we do a scale-up
-            if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Sum) {
-              Object o = sumMap.get(colName).get(colStatIndex);
-              if (o == null) {
-                row[2 + colStatIndex] = null;
-              } else {
-                Long val = MetastoreDirectSqlUtils.extractSqlLong(o);
-                row[2 + colStatIndex] = val / sumVal * (partNames.size());
-              }
-            } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Min
-                || IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Max) {
-              // if the aggregation type is min/max, we extrapolate from the
-              // left/right borders
-              if (!decimal) {
-                queryText = "select \"" + colStatName + "\",\"PART_NAME\" from 
" + PART_COL_STATS
-                    + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-                    + " inner join " + TBLS + " on " + PARTITIONS + 
".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
-                    + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
-                    + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                    + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                    + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-                    + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                    + " order by \"" + colStatName + "\"";
-              } else {
-                queryText = "select \"" + colStatName + "\",\"PART_NAME\" from 
" + PART_COL_STATS
+          } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Min ||
+                  IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Max) {
+            // if the aggregation type is min/max, we extrapolate from the
+            // left/right borders
+            String orderByExpr = decimal ? "cast(\"" + colStatName + "\" as 
decimal)" : "\"" + colStatName + "\"";

Review Comment:
   actually its not the same min max so here what we are doing is:
   let's say we have LONG_LOW_VALUE which is evaluated by taking min() of it so 
it is `IExtrapolatePartStatus.AggrType.Min` for which we have its minimum value 
but we need its maximum value as well which is not yet evaluated and is 
required for 
   `row[2 + colStatIndex] = extrapolateMethod.extrapolate(min, max, 
colStatIndex, indexMap);`
   and also i could have used directly MAX() in the query for that instead of 
order by but `PART_NAME` is also used in the extrapolate method so going with 
order by
   and vice-versa the case with `IExtrapolatePartStatus.AggrType.Min` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to