[
https://issues.apache.org/jira/browse/DRILL-7064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812742#comment-16812742
]
ASF GitHub Bot commented on DRILL-7064:
---------------------------------------
vvysotskyi commented on pull request #1736: DRILL-7064: Leverage the summary
metadata for plain COUNT aggregates.
URL: https://github.com/apache/drill/pull/1736#discussion_r273211742
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.planner.common.CountToDirectScanUtils;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
+import org.apache.drill.exec.store.direct.MetadataDirectGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.parquet.metadata.Metadata;
+import org.apache.drill.exec.store.parquet.metadata.Metadata_V4;
+import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.Set;
+
+/**
+ * <p> This rule is a logical planning counterpart to a corresponding
<b>ConvertCountToDirectScanPrule</b>
+ * physical rule
+ * </p>
+ * <p>
+ * This rule will convert <b>" select count(*) as mycount from table "</b>
+ * or <b>" select count(not-nullable-expr) as mycount from table "</b> into
+ * <pre>
+ * Project(mycount)
+ * \
+ * DirectGroupScan ( PojoRecordReader ( rowCount ))
+ *</pre>
+ * or <b>" select count(column) as mycount from table "</b> into
+ * <pre>
+ * Project(mycount)
+ * \
+ * DirectGroupScan (PojoRecordReader (columnValueCount))
+ *</pre>
+ * Rule can be applied if query contains multiple count expressions.
+ * <b>" select count(column1), count(column2), count(*) from table "</b>
+ * </p>
+ *
+ * <p>
+ * The rule utilizes the Parquet Metadata Cache's summary information to
retrieve the total row count
+ * and the per-column null count. As such, the rule is only applicable for
Parquet tables and only if the
+ * metadata cache has been created with the summary information.
+ * </p>
+ */
+public class ConvertCountToDirectScanRule extends RelOptRule {
+
+ public static final RelOptRule AGG_ON_PROJ_ON_SCAN = new
ConvertCountToDirectScanRule(
+ RelOptHelper.some(Aggregate.class,
+ RelOptHelper.some(Project.class,
+ RelOptHelper.any(TableScan.class))),
"Agg_on_proj_on_scan:logical");
+
+ public static final RelOptRule AGG_ON_SCAN = new
ConvertCountToDirectScanRule(
+ RelOptHelper.some(Aggregate.class,
+ RelOptHelper.any(TableScan.class)),
"Agg_on_scan:logical");
+
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(ConvertCountToDirectScanRule.class);
+
+ protected ConvertCountToDirectScanRule(RelOptRuleOperand rule, String id) {
+ super(rule, "ConvertCountToDirectScan:logical:" + id);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final Aggregate agg = (Aggregate) call.rel(0);
+ final TableScan scan = (TableScan) call.rel(call.rels.length - 1);
+ final Project project = call.rels.length == 3 ? (Project) call.rel(1) :
null;
+
+ // Qualifying conditions for rule:
+ // 1) There's no GroupBY key,
+ // 2) Agg is not a DISTINCT agg
+ // 3) Additional checks are done further below ..
+ if (agg.getGroupCount() > 0 ||
+ agg.containsDistinctCall()) {
+ return;
+ }
+
+ DrillTable drillTable = DrillRelOptUtil.getDrillTable(scan);
+
+ if (drillTable == null) {
+ logger.debug("Rule does not apply since an eligible drill table instance
was not found.");
+ return;
+ }
+
+ Object selection = drillTable.getSelection();
+
+ if (!(selection instanceof FormatSelection)) {
+ logger.debug("Rule does not apply since only Parquet file format is
eligible.");
+ return;
+ }
+
+ // Rule is applicable only if the statistics for row count and null count
are available from the metadata,
+ FormatSelection formatSelection = (FormatSelection) selection;
+ Pair<Boolean, Metadata_V4.MetadataSummary> status =
checkMetadataForScanStats(drillTable, formatSelection);
+
+ if (!status.getLeft()) {
+ logger.debug("Rule does not apply since MetadataSummary metadata was not
found.");
+ return;
+ }
+
+ PlannerSettings settings =
call.getPlanner().getContext().unwrap(PlannerSettings.class);
+ Metadata_V4.MetadataSummary metadataSummary = status.getRight();
+ Map<String, Long> result = collectCounts(settings, metadataSummary, agg,
scan, project);
+ logger.trace("Calculated the following aggregate counts: ", result);
+
+ // if counts could not be determined, rule won't be applied
+ if (result.isEmpty()) {
+ logger.debug("Rule does not apply since one or more COUNTs could not be
determined from metadata.");
+ return;
+ }
+
+ List<Path> fileList =
+
ImmutableList.of(Metadata.getSummaryFileName(formatSelection.getSelection().getSelectionRoot()));
+
+ final RelDataType scanRowType =
CountToDirectScanUtils.constructDataType(agg, result.keySet());
+
+ final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>(
+ CountToDirectScanUtils.buildSchema(scanRowType.getFieldNames()),
+ Collections.singletonList((List<Long>) new
ArrayList<>(result.values())));
+
+ final ScanStats scanStats = new
ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1,
scanRowType.getFieldCount());
+ final MetadataDirectGroupScan directScan = new
MetadataDirectGroupScan(reader, fileList, scanStats, true);
+
+ final DrillDirectScanRel newScan = new
DrillDirectScanRel(scan.getCluster(),
scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ directScan, scanRowType);
+
+ final DrillProjectRel newProject = new DrillProjectRel(agg.getCluster(),
agg.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ newScan, CountToDirectScanUtils.prepareFieldExpressions(scanRowType),
agg.getRowType());
+
+ call.transformTo(newProject);
+ }
+
+ private Pair<Boolean, Metadata_V4.MetadataSummary>
checkMetadataForScanStats(DrillTable drillTable, FormatSelection
formatSelection) {
+
+ // Currently only support metadata rowcount stats for Parquet tables
+ FormatPluginConfig formatConfig = formatSelection.getFormat();
+ if (!((formatConfig instanceof ParquetFormatConfig)
+ || ((formatConfig instanceof NamedFormatPluginConfig)
+ && ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) {
+ return new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false,
null);
+ }
+
+ FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin();
+ DrillFileSystem fs = null;
+ try {
+ fs = new
DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf());
+ } catch (IOException e) {
+ logger.warn("Unable to create the file system object for retrieving
statistics from metadata cache file ", e);
+ return new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false,
null);
+ }
+
+ // check if the cacheFileRoot has been set: this is needed because after
directory pruning, the
+ // cacheFileRoot could have been changed and not be the same as the
original selectionRoot
+ Path selectionRoot = formatSelection.getSelection().getCacheFileRoot() !=
null ?
+ formatSelection.getSelection().getCacheFileRoot() :
+ formatSelection.getSelection().getSelectionRoot();
+ Metadata_V4.MetadataSummary metadataSummary = Metadata.getSummary(fs,
selectionRoot, false, ParquetReaderConfig.builder().build());
Review comment:
Agree, also it is important to pass `OptionManager` into
`ParquetReaderConfig`: `withOptions(settings.getOptions())` to handle some
cases when statistics for varchars are allowed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Leverage the summary's totalRowCount and totalNullCount for COUNT() queries
> (also prevent eager expansion of files)
> -------------------------------------------------------------------------------------------------------------------
>
> Key: DRILL-7064
> URL: https://issues.apache.org/jira/browse/DRILL-7064
> Project: Apache Drill
> Issue Type: Sub-task
> Components: Metadata
> Reporter: Venkata Jyothsna Donapati
> Assignee: Aman Sinha
> Priority: Major
> Fix For: 1.16.0
>
> Original Estimate: 336h
> Remaining Estimate: 336h
>
> This sub-task is meant to leverage the Parquet metadata cache's summary
> stats: totalRowCount (across all files and row groups) and the per-column
> totalNullCount (across all files and row groups) to answer plain COUNT
> aggregation queries without Group-By. These are currently converted to a
> DirectScan by the ConvertCountToDirectScanRule which utilizes the row group
> metadata; however this rule is applied on Drill Logical rels and converts the
> logical plan to a physical plan with DirectScanPrel but this is too late
> since the DrillScanRel that is already created during logical planning has
> already read the entire metadata cache file along with its full list of row
> group entries. The metadata cache file can grow quite large and this does not
> scale.
> The solution is to use the Metadata Summary file that is created in
> DRILL-7063 and create a new rule that will apply early on such that it
> operates on the Calcite logical rels instead of the Drill logical rels and
> prevents eager expansion of the list of files/row groups.
> We will not remove the existing rule. The existing rule will continue to
> operate as before because it is possible that after some transformations, we
> still want to apply the optimizations for COUNT queries.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)