vvysotskyi commented on a change in pull request #1907: DRILL-7450: Improve performance for ANALYZE command URL: https://github.com/apache/drill/pull/1907#discussion_r351212269
########## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.expr.IsPredicate; +import org.apache.drill.exec.metastore.analyze.AnalyzeColumnUtils; +import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext; +import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.planner.physical.PrelUtil; +import org.apache.drill.exec.store.ColumnExplorer; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.FormatSelection; +import org.apache.drill.exec.store.direct.DirectGroupScan; +import org.apache.drill.exec.store.parquet.ParquetGroupScan; +import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader; +import org.apache.drill.exec.util.ImpersonationUtil; +import org.apache.drill.exec.util.Utilities; +import org.apache.drill.metastore.metadata.MetadataType; +import org.apache.drill.metastore.metadata.RowGroupMetadata; +import org.apache.drill.metastore.statistics.ColumnStatistics; +import org.apache.drill.metastore.statistics.ColumnStatisticsKind; +import org.apache.drill.metastore.statistics.ExactStatisticsConstants; +import org.apache.drill.metastore.statistics.StatisticsKind; +import org.apache.drill.metastore.statistics.TableStatisticsKind; +import org.apache.drill.shaded.guava.com.google.common.collect.HashBasedTable; +import org.apache.drill.shaded.guava.com.google.common.collect.Multimap; +import org.apache.drill.shaded.guava.com.google.common.collect.Table; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Rule which converts + * + * <pre> + * MetadataAggRel(metadataLevel=ROW_GROUP) + * \ + * DrillScanRel + * </pre> + * <p/> + * plan into + * <pre> + * DrillDirectScanRel + * </pre> + * where {@link DrillDirectScanRel} is populated with row group metadata. + * <p/> + * For the case when aggregate level is not ROW_GROUP, resulting plan will be the following: + * + * <pre> + * MetadataAggRel(metadataLevel=FILE (or another non-ROW_GROUP value), createNewAggregations=false) + * \ + * DrillDirectScanRel + * </pre> + */ +public class ConvertMetadataAggregateToDirectScanRule extends RelOptRule { + public static final ConvertMetadataAggregateToDirectScanRule INSTANCE = + new ConvertMetadataAggregateToDirectScanRule(); + + private static final Logger logger = LoggerFactory.getLogger(ConvertMetadataAggregateToDirectScanRule.class); + + public ConvertMetadataAggregateToDirectScanRule() { + super( + RelOptHelper.some(MetadataAggRel.class, RelOptHelper.any(DrillScanRel.class)), + DrillRelFactories.LOGICAL_BUILDER, "ConvertMetadataAggregateToDirectScanRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + MetadataAggRel agg = call.rel(0); + DrillScanRel scan = call.rel(1); + + GroupScan oldGrpScan = scan.getGroupScan(); + PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); + + // Only apply the rule for parquet group scan and for the case when required column metadata is present + if (!(oldGrpScan instanceof ParquetGroupScan) + || (oldGrpScan.getTableMetadata().getInterestingColumns() != null + && !oldGrpScan.getTableMetadata().getInterestingColumns().containsAll(agg.getContext().interestingColumns()))) { + return; + } + + try { + DirectGroupScan directScan = buildDirectScan(agg.getContext().interestingColumns(), scan, settings); + if (directScan == null) { + logger.warn("Unable to use parquet metadata for ANALYZE since some required metadata is absent within parquet metadata"); + return; + } + + RelNode converted = new DrillDirectScanRel(scan.getCluster(), scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + directScan, scan.getRowType()); + if (agg.getContext().metadataLevel() != MetadataType.ROW_GROUP) { + MetadataAggregateContext updatedContext = agg.getContext().toBuilder() + .createNewAggregations(false) + .build(); + converted = new MetadataAggRel(agg.getCluster(), agg.getTraitSet(), converted, updatedContext); + } + + call.transformTo(converted); + } catch (Exception e) { + logger.warn("Unable to use parquet metadata for ANALYZE due to exception {}", e.getMessage(), e); + } + } + + private DirectGroupScan buildDirectScan(List<SchemaPath> interestingColumns, DrillScanRel scan, PlannerSettings settings) throws IOException { + DrillTable drillTable = Utilities.getDrillTable(scan.getTable()); + + Map<String, Class<?>> schema = new HashMap<>(); + + // populates schema and fieldIndexes to be used when adding record values + FormatSelection selection = (FormatSelection) drillTable.getSelection(); + // adds partition columns to the schema + for (String partitionColumnName : ColumnExplorer.getPartitionColumnNames(selection.getSelection(), settings.getOptions())) { + schema.put(partitionColumnName, String.class); + } + + String rgi = settings.getOptions().getOption(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL).string_val; + String rgs = settings.getOptions().getOption(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL).string_val; + String rgl = settings.getOptions().getOption(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL).string_val; + String lmt = settings.getOptions().getOption(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL).string_val; + + // adds internal implicit columns to the schema + + schema.put(MetastoreAnalyzeConstants.SCHEMA_FIELD, String.class); + schema.put(MetastoreAnalyzeConstants.LOCATION_FIELD, String.class); + schema.put(rgi, String.class); + schema.put(rgs, String.class); + schema.put(rgl, String.class); + schema.put(lmt, String.class); + + return populateRecords(interestingColumns, schema, scan, settings); + } + + /** + * Populates records list with row group metadata. + */ + private DirectGroupScan populateRecords(Collection<SchemaPath> interestingColumns, Map<String, Class<?>> schema, + DrillScanRel scan, PlannerSettings settings) throws IOException { + ParquetGroupScan parquetGroupScan = (ParquetGroupScan) scan.getGroupScan(); + DrillTable drillTable = Utilities.getDrillTable(scan.getTable()); + + Multimap<Path, RowGroupMetadata> rowGroupsMetadataMap = parquetGroupScan.getMetadataProvider().getRowGroupsMetadataMap(); + + Table<String, Integer, Object> recordsTable = HashBasedTable.create(); + FormatSelection selection = (FormatSelection) drillTable.getSelection(); + List<String> partitionColumnNames = ColumnExplorer.getPartitionColumnNames(selection.getSelection(), settings.getOptions()); + + String rgi = settings.getOptions().getOption(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL).string_val; + String rgs = settings.getOptions().getOption(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL).string_val; + String rgl = settings.getOptions().getOption(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL).string_val; + String lmt = settings.getOptions().getOption(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL).string_val; + + FileSystem rawFs = selection.getSelection().getSelectionRoot().getFileSystem(new Configuration()); + DrillFileSystem fileSystem = + ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), rawFs.getConf()); + + int rowIndex = 0; + for (Map.Entry<Path, RowGroupMetadata> rgEntry : rowGroupsMetadataMap.entries()) { + Path path = rgEntry.getKey(); + RowGroupMetadata rowGroupMetadata = rgEntry.getValue(); + List<String> partitionValues = ColumnExplorer.listPartitionValues(path, selection.getSelection().getSelectionRoot(), false); + for (int i = 0; i < partitionValues.size(); i++) { + String partitionColumnName = partitionColumnNames.get(i); + recordsTable.put(partitionColumnName, rowIndex, partitionValues.get(i)); + } + + recordsTable.put(MetastoreAnalyzeConstants.LOCATION_FIELD, rowIndex, ColumnExplorer.ImplicitFileColumns.FQN.getValue(path)); + recordsTable.put(rgi, rowIndex, String.valueOf(rowGroupMetadata.getRowGroupIndex())); + + if (interestingColumns == null) { + interestingColumns = rowGroupMetadata.getColumnsStatistics().keySet(); + } + + // populates record list with row group column metadata + for (SchemaPath schemaPath : interestingColumns) { + ColumnStatistics columnStatistics = rowGroupMetadata.getColumnsStatistics().get(schemaPath); + if (IsPredicate.isNullOrEmpty(columnStatistics)) { + logger.warn("Statistics for {} column wasn't found within {} row group.", schemaPath, path); Review comment: Debug should be there, thanks, updated. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
