ramitg254 commented on code in PR #6089:
URL: https://github.com/apache/hive/pull/6089#discussion_r2621892496


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java:
##########
@@ -2159,102 +2188,185 @@ private List<ColumnStatisticsObj> 
aggrStatsUseDB(String catName, String dbName,
           if (index == null) {
             index = IExtrapolatePartStatus.indexMaps.get("default");
           }
+
+          //for avg calculation
+          queryText = "select " + 
"sum((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as 
decimal)),"
+              + 
"count((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as 
decimal)),"
+              + 
"sum((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+              + 
"count((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+              + "sum((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
+              + "count((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
+              + " from " + PART_COL_STATS + ""
+              + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+              + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " 
+ TBLS + ".\"TBL_ID\""
+              + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\""
+              + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" 
= ? and " + TBLS + ".\"TBL_NAME\" = ? "
+              + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (%1$s)"
+              + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)"
+              + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
+              + " group by \"COLUMN_NAME\"";
+
+          columnWisePartitionBatches =
+                  columnWisePartitionBatcher(queryText, catName, dbName, 
tableName, partNames, engine, doTrace);
+          Object[] sum = new Object[3];
+          Object[] count = new Object[3];
+          Integer[] avgIndex = new Integer[]{14, 15, 16};
+          try {
+            list = Batchable.runBatched(batchSize, 
Collections.singletonList(colName), columnWisePartitionBatches);
+            for (int i = 0; i < 6; i += 2) {
+              for (Object[] batch : list) {
+                sum[i / 2] = MetastoreDirectSqlUtils.sum(sum[i / 2], batch[i]);
+                count[i / 2] = MetastoreDirectSqlUtils.sum(count[i / 2], 
batch[i + 1]);
+              }
+              // filling in sum and count in row for avg calculation later on
+              row[avgIndex[i / 2] + i / 2] = sum[i / 2];
+              row[avgIndex[i / 2] + i / 2 + 1] = count[i / 2];
+            }
+          } finally {
+            columnWisePartitionBatches.closeAllQueries();
+          }
           for (int colStatIndex : index) {
             String colStatName = 
IExtrapolatePartStatus.colStatNames[colStatIndex];
             // if the aggregation type is sum, we do a scale-up
             if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Sum) {
               Object o = sumMap.get(colName).get(colStatIndex);
+              // +5 only for the case of SUM_NUM_DISTINCTS which is after avg 
indices
+              int rowIndex = (colStatIndex == 15) ? colStatIndex + 5 : 
colStatIndex + 2;
               if (o == null) {
-                row[2 + colStatIndex] = null;
+                row[rowIndex] = null;
               } else {
                 Long val = MetastoreDirectSqlUtils.extractSqlLong(o);
-                row[2 + colStatIndex] = val / sumVal * (partNames.size());
+                row[rowIndex] = val / sumVal * (partNames.size());
               }
             } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Min
                 || IExtrapolatePartStatus.aggrTypes[colStatIndex] == 
IExtrapolatePartStatus.AggrType.Max) {
               // if the aggregation type is min/max, we extrapolate from the
               // left/right borders
-              if (!decimal) {
-                queryText = "select \"" + colStatName + "\",\"PART_NAME\" from 
" + PART_COL_STATS
-                    + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-                    + " inner join " + TBLS + " on " + PARTITIONS + 
".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
-                    + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
-                    + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                    + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                    + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-                    + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                    + " order by \"" + colStatName + "\"";
-              } else {
-                queryText = "select \"" + colStatName + "\",\"PART_NAME\" from 
" + PART_COL_STATS
-                    + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-                    + " inner join " + TBLS + " on " + PARTITIONS + 
".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\""
-                    + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
-                    + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                    + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                    + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
-                    + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                    + " order by cast(\"" + colStatName + "\" as decimal)";
-              }
-              start = doTrace ? System.nanoTime() : 0;
-              try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-                Object qResult = executeWithArray(query.getInnerQuery(),
-                    prepareParams(catName, dbName, tableName, partNames, 
Arrays.asList(colName), engine), queryText);
-                if (qResult == null) {
-                  return Collections.emptyList();
-                }
-                fqr = (ForwardQueryResult<?>) qResult;
-                Object[] min = (Object[]) (fqr.get(0));
-                Object[] max = (Object[]) (fqr.get(fqr.size() - 1));
-                end = doTrace ? System.nanoTime() : 0;
-                MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, 
end);
-                if (min[0] == null || max[0] == null) {
-                  row[2 + colStatIndex] = null;
-                } else {
-                  row[2 + colStatIndex] = extrapolateMethod
-                      .extrapolate(min, max, colStatIndex, indexMap);
-                }
-              }
-            } else {
-              // if the aggregation type is avg, we use the average on the 
existing ones.
-              queryText = "select "
-                  + 
"avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as 
decimal)),"
-                  + 
"avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
-                  + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as 
decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")"
-                  + " from " + PART_COL_STATS + ""
+              String orderByExpr = decimal ? "cast(\"" + colStatName + "\" as 
decimal)" : "\"" + colStatName + "\"";
+
+              queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " 
+ PART_COL_STATS
                   + " inner join " + PARTITIONS + " on " + PART_COL_STATS + 
".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
                   + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" 
= " + TBLS + ".\"TBL_ID\""
                   + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + 
DBS + ".\"DB_ID\""
                   + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + 
".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "
-                  + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? "
-                  + " and " + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(partNames.size()) + ")"
+                  + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (%1$s)"
+                  + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)"
                   + " and " + PART_COL_STATS + ".\"ENGINE\" = ? "
-                  + " group by \"COLUMN_NAME\"";
-              start = doTrace ? System.nanoTime() : 0;
-              try(QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-                Object qResult = executeWithArray(query.getInnerQuery(),
-                    prepareParams(catName, dbName, tableName, partNames, 
Arrays.asList(colName), engine), queryText);
-                if (qResult == null) {
-                  return Collections.emptyList();
+                  + " order by " + orderByExpr;
+
+              columnWisePartitionBatches =
+                      columnWisePartitionBatcher(queryText, catName, dbName, 
tableName, partNames, engine, doTrace);
+              try {
+                list = Batchable.runBatched(batchSize, 
Collections.singletonList(colName), columnWisePartitionBatches);
+                Object[] min = list.getFirst();
+                Object[] max = list.getLast();
+                for (int i = Math.min(batchSize - 1, list.size() - 1); i < 
list.size(); i += batchSize) {
+                  Object[] posMax = list.get(i);
+                  if (new BigDecimal(max[0].toString()).compareTo(new 
BigDecimal(posMax[0].toString())) < 0) {
+                    max = posMax;
+                  }
+                  int j = i + 1;
+                  if (j < list.size()) {
+                    Object[] posMin = list.get(j);
+                    if (new BigDecimal(min[0].toString()).compareTo(new 
BigDecimal(posMin[0].toString())) > 0) {
+                      min = posMin;
+                    }
+                  }
                 }
-                fqr = (ForwardQueryResult<?>) qResult;
-                Object[] avg = (Object[]) (fqr.get(0));
-                // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE",
-                // "AVG_DECIMAL"
-                row[2 + colStatIndex] = avg[colStatIndex - 12];
-                end = doTrace ? System.nanoTime() : 0;
-                MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, 
end);
+                if (min[0] == null || max[0] == null) {
+                  row[2 + colStatIndex] = null;
+                } else {
+                  row[2 + colStatIndex] = extrapolateMethod.extrapolate(min, 
max, colStatIndex, indexMap);
+                }
+              } finally {
+                columnWisePartitionBatches.closeAllQueries();
               }
             }
           }
-          colStats.add(prepareCSObjWithAdjustedNDV(row, 0, 
useDensityFunctionForNDVEstimation, ndvTuner));
+          colStats.add(prepareCSObjWithAdjustedNDV
+                  (row, useDensityFunctionForNDVEstimation, ndvTuner));
           Deadline.checkTimeout();
         }
       }
       return colStats;
     }
   }
 
-  private ColumnStatisticsObj prepareCSObj (Object[] row, int i) throws 
MetaException {
+  private void columnWiseStatsMerger(

Review Comment:
   separate method for the code chunk mentioned 
:https://github.com/apache/hive/pull/6089#discussion_r2589091584



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to