dvjyothsna commented on a change in pull request #1736: DRILL-7064: Leverage the summary metadata for plain COUNT aggregates. URL: https://github.com/apache/drill/pull/1736#discussion_r273161135
########## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexInputRef; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.FormatPluginConfig; + +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.planner.common.CountToDirectScanUtils; +import org.apache.drill.exec.planner.common.DrillRelOptUtil; + +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.store.ColumnExplorer; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.dfs.FormatSelection; +import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig; +import org.apache.drill.exec.store.direct.MetadataDirectGroupScan; +import org.apache.drill.exec.store.parquet.ParquetFormatConfig; +import org.apache.drill.exec.store.parquet.ParquetReaderConfig; +import org.apache.drill.exec.store.parquet.metadata.Metadata; +import org.apache.drill.exec.store.parquet.metadata.Metadata_V4; +import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.Set; + +/** + * <p> This rule is a logical planning counterpart to a corresponding <b>ConvertCountToDirectScanPrule</b> + * physical rule + * </p> + * <p> + * This rule will convert <b>" select count(*) as mycount from table "</b> + * or <b>" select count(not-nullable-expr) as mycount from table "</b> into + * <pre> + * Project(mycount) + * \ + * DirectGroupScan ( PojoRecordReader ( rowCount )) + *</pre> + * or <b>" select count(column) as mycount from table "</b> into + * <pre> + * Project(mycount) + * \ + * DirectGroupScan (PojoRecordReader (columnValueCount)) + *</pre> + * Rule can be applied if query contains multiple count expressions. + * <b>" select count(column1), count(column2), count(*) from table "</b> + * </p> + * + * <p> + * The rule utilizes the Parquet Metadata Cache's summary information to retrieve the total row count + * and the per-column null count. As such, the rule is only applicable for Parquet tables and only if the + * metadata cache has been created with the summary information. + * </p> + */ +public class ConvertCountToDirectScanRule extends RelOptRule { + + public static final RelOptRule AGG_ON_PROJ_ON_SCAN = new ConvertCountToDirectScanRule( + RelOptHelper.some(Aggregate.class, + RelOptHelper.some(Project.class, + RelOptHelper.any(TableScan.class))), "Agg_on_proj_on_scan:logical"); + + public static final RelOptRule AGG_ON_SCAN = new ConvertCountToDirectScanRule( + RelOptHelper.some(Aggregate.class, + RelOptHelper.any(TableScan.class)), "Agg_on_scan:logical"); + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConvertCountToDirectScanRule.class); + + protected ConvertCountToDirectScanRule(RelOptRuleOperand rule, String id) { + super(rule, "ConvertCountToDirectScan:logical:" + id); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final Aggregate agg = (Aggregate) call.rel(0); + final TableScan scan = (TableScan) call.rel(call.rels.length - 1); + final Project project = call.rels.length == 3 ? (Project) call.rel(1) : null; + + // Qualifying conditions for rule: + // 1) There's no GroupBY key, + // 2) Agg is not a DISTINCT agg + // 3) Additional checks are done further below .. + if (agg.getGroupCount() > 0 || + agg.containsDistinctCall()) { + return; + } + + DrillTable drillTable = DrillRelOptUtil.getDrillTable(scan); + + if (drillTable == null) { + logger.debug("Rule does not apply since an eligible drill table instance was not found."); + return; + } + + Object selection = drillTable.getSelection(); + + if (!(selection instanceof FormatSelection)) { + logger.debug("Rule does not apply since only Parquet file format is eligible."); + return; + } + + // Rule is applicable only if the statistics for row count and null count are available from the metadata, + FormatSelection formatSelection = (FormatSelection) selection; + Pair<Boolean, Metadata_V4.MetadataSummary> status = checkMetadataForScanStats(drillTable, formatSelection); + + if (!status.getLeft()) { + logger.debug("Rule does not apply since MetadataSummary metadata was not found."); + return; + } + + PlannerSettings settings = call.getPlanner().getContext().unwrap(PlannerSettings.class); + Metadata_V4.MetadataSummary metadataSummary = status.getRight(); + Map<String, Long> result = collectCounts(settings, metadataSummary, agg, scan, project); + logger.trace("Calculated the following aggregate counts: ", result); + + // if counts could not be determined, rule won't be applied + if (result.isEmpty()) { + logger.debug("Rule does not apply since one or more COUNTs could not be determined from metadata."); + return; + } + + List<Path> fileList = + ImmutableList.of(Metadata.getSummaryFileName(formatSelection.getSelection().getSelectionRoot())); + + final RelDataType scanRowType = CountToDirectScanUtils.constructDataType(agg, result.keySet()); + + final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>( + CountToDirectScanUtils.buildSchema(scanRowType.getFieldNames()), + Collections.singletonList((List<Long>) new ArrayList<>(result.values()))); + + final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount()); + final MetadataDirectGroupScan directScan = new MetadataDirectGroupScan(reader, fileList, scanStats, true); + + final DrillDirectScanRel newScan = new DrillDirectScanRel(scan.getCluster(), scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + directScan, scanRowType); + + final DrillProjectRel newProject = new DrillProjectRel(agg.getCluster(), agg.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + newScan, CountToDirectScanUtils.prepareFieldExpressions(scanRowType), agg.getRowType()); + + call.transformTo(newProject); + } + + private Pair<Boolean, Metadata_V4.MetadataSummary> checkMetadataForScanStats(DrillTable drillTable, FormatSelection formatSelection) { + + // Currently only support metadata rowcount stats for Parquet tables + FormatPluginConfig formatConfig = formatSelection.getFormat(); + if (!((formatConfig instanceof ParquetFormatConfig) + || ((formatConfig instanceof NamedFormatPluginConfig) + && ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) { + return new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false, null); + } + + FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin(); + DrillFileSystem fs = null; + try { + fs = new DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf()); + } catch (IOException e) { + logger.warn("Unable to create the file system object for retrieving statistics from metadata cache file ", e); + return new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false, null); + } + + // check if the cacheFileRoot has been set: this is needed because after directory pruning, the + // cacheFileRoot could have been changed and not be the same as the original selectionRoot + Path selectionRoot = formatSelection.getSelection().getCacheFileRoot() != null ? + formatSelection.getSelection().getCacheFileRoot() : + formatSelection.getSelection().getSelectionRoot(); + Metadata_V4.MetadataSummary metadataSummary = Metadata.getSummary(fs, selectionRoot, false, ParquetReaderConfig.builder().build()); Review comment: Can we do this ParquetReaderConfig parquetReaderConfig= ParquetReaderConfig.builder() .withFormatConfig((ParquetFormatConfig) formatConfig) .build(); Metadata.getSummary(fs, selectionRoot, false, parquetReaderConfig); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
