[ 
https://issues.apache.org/jira/browse/DRILL-7064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812637#comment-16812637
 ] 

ASF GitHub Bot commented on DRILL-7064:
---------------------------------------

dvjyothsna commented on pull request #1736: DRILL-7064: Leverage the summary 
metadata for plain COUNT aggregates.
URL: https://github.com/apache/drill/pull/1736#discussion_r273158196
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
 ##########
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.planner.common.CountToDirectScanUtils;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
+import org.apache.drill.exec.store.direct.MetadataDirectGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.parquet.metadata.Metadata;
+import org.apache.drill.exec.store.parquet.metadata.Metadata_V4;
+import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.Set;
+
+/**
+ * <p> This rule is a logical planning counterpart to a corresponding 
<b>ConvertCountToDirectScanPrule</b>
+ * physical rule
+ * </p>
+ * <p>
+ * This rule will convert <b>" select count(*)  as mycount from table "</b>
+ * or <b>" select count(not-nullable-expr) as mycount from table "</b> into
+ * <pre>
+ *    Project(mycount)
+ *         \
+ *    DirectGroupScan ( PojoRecordReader ( rowCount ))
+ *</pre>
+ * or <b>" select count(column) as mycount from table "</b> into
+ * <pre>
+ *      Project(mycount)
+ *           \
+ *            DirectGroupScan (PojoRecordReader (columnValueCount))
+ *</pre>
+ * Rule can be applied if query contains multiple count expressions.
+ * <b>" select count(column1), count(column2), count(*) from table "</b>
+ * </p>
+ *
+ * <p>
+ * The rule utilizes the Parquet Metadata Cache's summary information to 
retrieve the total row count
+ * and the per-column null count.  As such, the rule is only applicable for 
Parquet tables and only if the
+ * metadata cache has been created with the summary information.
+ * </p>
+ */
+public class ConvertCountToDirectScanRule extends RelOptRule {
+
+  public static final RelOptRule AGG_ON_PROJ_ON_SCAN = new 
ConvertCountToDirectScanRule(
+      RelOptHelper.some(Aggregate.class,
+                        RelOptHelper.some(Project.class,
+                            RelOptHelper.any(TableScan.class))), 
"Agg_on_proj_on_scan:logical");
+
+  public static final RelOptRule AGG_ON_SCAN = new 
ConvertCountToDirectScanRule(
+      RelOptHelper.some(Aggregate.class,
+                            RelOptHelper.any(TableScan.class)), 
"Agg_on_scan:logical");
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ConvertCountToDirectScanRule.class);
+
+  protected ConvertCountToDirectScanRule(RelOptRuleOperand rule, String id) {
+    super(rule, "ConvertCountToDirectScan:logical:" + id);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final Aggregate agg = (Aggregate) call.rel(0);
+    final TableScan scan = (TableScan) call.rel(call.rels.length - 1);
+    final Project project = call.rels.length == 3 ? (Project) call.rel(1) : 
null;
+
+    // Qualifying conditions for rule:
+    //    1) There's no GroupBY key,
+    //    2) Agg is not a DISTINCT agg
+    //    3) Additional checks are done further below ..
+    if (agg.getGroupCount() > 0 ||
+        agg.containsDistinctCall()) {
+      return;
+    }
+
+    DrillTable drillTable = DrillRelOptUtil.getDrillTable(scan);
+
+    if (drillTable == null) {
+      logger.debug("Rule does not apply since an eligible drill table instance 
was not found.");
+      return;
+    }
+
+    Object selection = drillTable.getSelection();
+
+    if (!(selection instanceof FormatSelection)) {
+      logger.debug("Rule does not apply since only Parquet file format is 
eligible.");
+      return;
+    }
+
+    //  Rule is applicable only if the statistics for row count and null count 
are available from the metadata,
+    FormatSelection formatSelection = (FormatSelection) selection;
+    Pair<Boolean, Metadata_V4.MetadataSummary> status = 
checkMetadataForScanStats(drillTable, formatSelection);
+
+    if (!status.getLeft()) {
+      logger.debug("Rule does not apply since MetadataSummary metadata was not 
found.");
+      return;
+    }
+
+    PlannerSettings settings = 
call.getPlanner().getContext().unwrap(PlannerSettings.class);
+    Metadata_V4.MetadataSummary metadataSummary = status.getRight();
+    Map<String, Long> result = collectCounts(settings, metadataSummary, agg, 
scan, project);
+    logger.trace("Calculated the following aggregate counts: ", result);
+
+    // if counts could not be determined, rule won't be applied
+    if (result.isEmpty()) {
+      logger.debug("Rule does not apply since one or more COUNTs could not be 
determined from metadata.");
+      return;
+    }
+
+    List<Path> fileList =
+            
ImmutableList.of(Metadata.getSummaryFileName(formatSelection.getSelection().getSelectionRoot()));
+
+    final RelDataType scanRowType = 
CountToDirectScanUtils.constructDataType(agg, result.keySet());
+
+    final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>(
+        CountToDirectScanUtils.buildSchema(scanRowType.getFieldNames()),
+        Collections.singletonList((List<Long>) new 
ArrayList<>(result.values())));
+
+    final ScanStats scanStats = new 
ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, 
scanRowType.getFieldCount());
+    final MetadataDirectGroupScan directScan = new 
MetadataDirectGroupScan(reader, fileList, scanStats, true);
+
+    final DrillDirectScanRel newScan = new 
DrillDirectScanRel(scan.getCluster(), 
scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+      directScan, scanRowType);
+
+    final DrillProjectRel newProject = new DrillProjectRel(agg.getCluster(), 
agg.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+      newScan, CountToDirectScanUtils.prepareFieldExpressions(scanRowType), 
agg.getRowType());
+
+    call.transformTo(newProject);
+  }
+
+  private Pair<Boolean, Metadata_V4.MetadataSummary> 
checkMetadataForScanStats(DrillTable drillTable, FormatSelection 
formatSelection) {
+
+    // Currently only support metadata rowcount stats for Parquet tables
+    FormatPluginConfig formatConfig = formatSelection.getFormat();
+    if (!((formatConfig instanceof ParquetFormatConfig)
+      || ((formatConfig instanceof NamedFormatPluginConfig)
+      && ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) {
+      return new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false, 
null);
+    }
+
+    FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin();
+    DrillFileSystem fs = null;
+    try {
+       fs = new 
DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf());
+    } catch (IOException e) {
+      logger.warn("Unable to create the file system object for retrieving 
statistics from metadata cache file ", e);
+      return new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false, 
null);
+    }
+
+    // check if the cacheFileRoot has been set: this is needed because after 
directory pruning, the
+    // cacheFileRoot could have been changed and not be the same as the 
original selectionRoot
+    Path selectionRoot = formatSelection.getSelection().getCacheFileRoot() != 
null ?
+            formatSelection.getSelection().getCacheFileRoot() :
+            formatSelection.getSelection().getSelectionRoot();
+    Metadata_V4.MetadataSummary metadataSummary = Metadata.getSummary(fs, 
selectionRoot, false, ParquetReaderConfig.builder().build());
+
+    return metadataSummary != null ? new ImmutablePair<Boolean, 
Metadata_V4.MetadataSummary>(true, metadataSummary) :
+      new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false, null);
+  }
+
+  /**
+   * Collects counts for each aggregation call by using the metadata summary 
information
+   * Will return empty result map if was not able to determine count for at 
least one aggregation call.
+   *
+   * For each aggregate call will determine if count can be calculated. 
Collects counts only for COUNT function.
+   *   1. First, we get the total row count from the metadata summary.
+   *   2. For COUNT(*) and COUNT(<non null column>) and COUNT(<implicit 
column>), the count = total row count
+   *   3. For COUNT(nullable column), count = (total row count - column's null 
count)
+   *   4. Also count can not be calculated for parition columns.
+   *
+   * @param settings planner options
+   * @param metadataSummary metadata summary containing row counts and column 
counts
+   * @param agg aggregate relational expression
+   * @param scan scan relational expression
+   * @param project project relational expression
+   * @return result map where key is count column name, value is count value
+   */
+  private Map<String, Long> collectCounts(PlannerSettings settings, 
Metadata_V4.MetadataSummary metadataSummary,
+                                          Aggregate agg, TableScan scan, 
Project project) {
+    final Set<String> implicitColumnsNames = 
ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet();
+    final long totalRecordCount = metadataSummary.getTotalRowCount();
+    final LinkedHashMap<String, Long> result = new LinkedHashMap<>();
+
+    for (int i = 0; i < agg.getAggCallList().size(); i++) {
+      AggregateCall aggCall = agg.getAggCallList().get(i);
+      long cnt;
+
+      // rule can be applied only for count function, return empty counts
+      if (!"count".equalsIgnoreCase(aggCall.getAggregation().getName()) ) {
+        return ImmutableMap.of();
+      }
+
+      if (CountToDirectScanUtils.containsStarOrNotNullInput(aggCall, agg)) {
+        cnt = totalRecordCount;
+
+      } else if (aggCall.getArgList().size() == 1) {
+        // count(columnName) ==> Agg ( Scan )) ==> columnValueCount
+        int index = aggCall.getArgList().get(0);
+
+        if (project != null) {
+          // project in the middle of Agg and Scan : Only when input of 
AggCall is a RexInputRef in Project, we find the index of Scan's field.
+          // For instance,
+          // Agg - count($0)
+          //  \
+          //  Proj - Exp={$1}
+          //    \
+          //   Scan (col1, col2).
+          // return count of "col2" in Scan's metadata, if found.
+          if (!(project.getProjects().get(index) instanceof RexInputRef)) {
+            return ImmutableMap.of(); // do not apply for all other cases.
+          }
+
+          index = ((RexInputRef) project.getProjects().get(index)).getIndex();
+        }
+
+        String columnName = 
scan.getRowType().getFieldNames().get(index).toLowerCase();
+
+        // for implicit column count will be the same as total record count
+        if (implicitColumnsNames.contains(columnName)) {
+          cnt = totalRecordCount;
+        } else {
+          SchemaPath simplePath = SchemaPath.getSimplePath(columnName);
+
+          if (ColumnExplorer.isPartitionColumn(settings.getOptions(), 
simplePath)) {
+            return ImmutableMap.of();
+          }
+
+          Metadata_V4.ColumnTypeMetadata_v4 columnMetadata = 
metadataSummary.getColumnTypeInfo(new 
Metadata_V4.ColumnTypeMetadata_v4.Key(simplePath));
+
+         if (columnMetadata == null) {
+            // if column stats is not available don't apply this rule, return 
empty counts
+            return ImmutableMap.of();
+          } else {
+           // count of a nullable column = (total row count - column's null 
count)
+           cnt = totalRecordCount - columnMetadata.totalNullCount;
 
 Review comment:
   @vvysotskyi Just to clarify your question is what will be the nullCount in 
ColumnTypeMetadata. The null count is set to -1 as we did for V3. If the 
statistics are not present, nullCount will be set to -1 in 
[here](https://github.com/apache/drill/blob/bd5c530262dfe05ce21e1a7d2a5822553fe82148/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java#L559).
 This is for all the columns i.e., both the interesting and non-interesting 
columns.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Leverage the summary's totalRowCount and totalNullCount for COUNT() queries 
> (also prevent eager expansion of files)
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: DRILL-7064
>                 URL: https://issues.apache.org/jira/browse/DRILL-7064
>             Project: Apache Drill
>          Issue Type: Sub-task
>          Components: Metadata
>            Reporter: Venkata Jyothsna Donapati
>            Assignee: Aman Sinha
>            Priority: Major
>             Fix For: 1.16.0
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> This sub-task is meant to leverage the Parquet metadata cache's summary 
> stats: totalRowCount (across all files and row groups) and the per-column 
> totalNullCount (across all files and row groups) to answer plain COUNT 
> aggregation queries without Group-By.  These are currently converted to a 
> DirectScan by the ConvertCountToDirectScanRule which utilizes the row group 
> metadata; however this rule is applied on Drill Logical rels and converts the 
> logical plan to a physical plan with DirectScanPrel but this is too late 
> since the DrillScanRel that is already created during logical planning has 
> already read the entire metadata cache file along with its full list of row 
> group entries. The metadata cache file can grow quite large and this does not 
> scale. 
> The solution is to use the Metadata Summary file that is created in 
> DRILL-7063 and create a new rule that will apply early on such that it 
> operates on the Calcite logical rels instead of the Drill logical rels and 
> prevents eager expansion of the list of files/row groups.   
> We will not remove the existing rule. The existing rule will continue to 
> operate as before because it is possible that after some transformations, we 
> still want to apply the optimizations for COUNT queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to