[ https://issues.apache.org/jira/browse/DRILL-4735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16114728#comment-16114728 ]
ASF GitHub Bot commented on DRILL-4735: --------------------------------------- Github user jinfengni commented on a diff in the pull request: https://github.com/apache/drill/pull/882#discussion_r131447047 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java --- @@ -85,109 +91,231 @@ protected ConvertCountToDirectScan(RelOptRuleOperand rule, String id) { @Override public void onMatch(RelOptRuleCall call) { final DrillAggregateRel agg = (DrillAggregateRel) call.rel(0); - final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1); - final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; + final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length - 1); + final DrillProjectRel project = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; final GroupScan oldGrpScan = scan.getGroupScan(); final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); - // Only apply the rule when : + // Only apply the rule when: // 1) scan knows the exact row count in getSize() call, // 2) No GroupBY key, - // 3) only one agg function (Check if it's count(*) below). - // 4) No distinct agg call. + // 3) No distinct agg call. if (!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount() && agg.getGroupCount() == 0 - && agg.getAggCallList().size() == 1 && !agg.containsDistinctCall())) { return; } - AggregateCall aggCall = agg.getAggCallList().get(0); - - if (aggCall.getAggregation().getName().equals("COUNT") ) { - - long cnt = 0; - // count(*) == > empty arg ==> rowCount - // count(Not-null-input) ==> rowCount - if (aggCall.getArgList().isEmpty() || - (aggCall.getArgList().size() == 1 && - ! agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable())) { - cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount(); - } else if (aggCall.getArgList().size() == 1) { - // count(columnName) ==> Agg ( Scan )) ==> columnValueCount - int index = aggCall.getArgList().get(0); - - if (proj != null) { - // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef in Project, we find the index of Scan's field. - // For instance, - // Agg - count($0) - // \ - // Proj - Exp={$1} - // \ - // Scan (col1, col2). - // return count of "col2" in Scan's metadata, if found. - - if (proj.getProjects().get(index) instanceof RexInputRef) { - index = ((RexInputRef) proj.getProjects().get(index)).getIndex(); - } else { - return; // do not apply for all other cases. - } - } + final CountsCollector countsCollector = new CountsCollector(settings); + // if counts were not collected, rule won't be applied + if (!countsCollector.collect(agg, scan, project)) { + return; + } - String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase(); + final RelDataType scanRowType = constructDataType(agg); - cnt = oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName)); - if (cnt == GroupScan.NO_COLUMN_STATS) { - // if column stats are not available don't apply this rule - return; - } - } else { - return; // do nothing. - } + final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>( + buildSchema(scanRowType.getFieldNames()), + Collections.singletonList(countsCollector.getCounts())); - RelDataType scanRowType = getCountDirectScanRowType(agg.getCluster().getTypeFactory()); + final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount()); + final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats); - final ScanPrel newScan = ScanPrel.create(scan, - scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), getCountDirectScan(cnt), - scanRowType); + final ScanPrel newScan = ScanPrel.create(scan, + scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), directScan, + scanRowType); - List<RexNode> exprs = Lists.newArrayList(); - exprs.add(RexInputRef.of(0, scanRowType)); + final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) + .plus(DrillDistributionTrait.SINGLETON), newScan, prepareFieldExpressions(scanRowType), agg.getRowType()); - final ProjectPrel newProj = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) - .plus(DrillDistributionTrait.SINGLETON), newScan, exprs, agg.getRowType()); + call.transformTo(newProject); + } - call.transformTo(newProj); + /** + * For each aggregate call creates field with "count$" prefix and bigint type. + * Constructs record type for created fields. + * + * @param aggregateRel aggregate relation expression + * @return record type + */ + private RelDataType constructDataType(DrillAggregateRel aggregateRel) { + List<RelDataTypeField> fields = new ArrayList<>(); + for (int i = 0; i < aggregateRel.getAggCallList().size(); i++) { + RelDataTypeField field = new RelDataTypeFieldImpl("count$" + i, i, aggregateRel.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT)); + fields.add(field); } - + return new RelRecordType(fields); } /** - * Class to represent the count aggregate result. + * Builds schema based on given field names. + * Type for each schema is set to long.class. + * + * @param fieldNames field names + * @return schema */ - public static class CountQueryResult { - public long count; + private LinkedHashMap<String, Class<?>> buildSchema(List<String> fieldNames) { + LinkedHashMap<String, Class<?>> schema = new LinkedHashMap<>(); + for (String fieldName: fieldNames) { + schema.put(fieldName, long.class); + } + return schema; + } - public CountQueryResult(long cnt) { - this.count = cnt; + /** + * For each field creates row expression. + * + * @param rowType row type + * @return list of row expressions + */ + private List<RexNode> prepareFieldExpressions(RelDataType rowType) { + List<RexNode> expressions = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + expressions.add(RexInputRef.of(i, rowType)); } + return expressions; } - private RelDataType getCountDirectScanRowType(RelDataTypeFactory typeFactory) { - List<RelDataTypeField> fields = Lists.newArrayList(); - fields.add(new RelDataTypeFieldImpl("count", 0, typeFactory.createSqlType(SqlTypeName.BIGINT))); + /** + * Helper class to collect counts based on metadata information. + * For example, for parquet files it can be obtained from parquet footer (total row count) + * or from parquet metadata files (column counts). + */ + private class CountsCollector { + + private final PlannerSettings settings; + private final Set<String> implicitColumnsNames; + private final List<SchemaPath> columns; + private final List<Long> counts; + + CountsCollector(PlannerSettings settings) { + this.settings = settings; + this.implicitColumnsNames = ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet(); + this.counts = new ArrayList<>(); + this.columns = new ArrayList<>(); + } - return new RelRecordType(fields); - } + /** + * Collects counts for each aggregation call. + * Will fail to collect counts if: + * <ol> + * <li>was not able to determine count for at least one aggregation call</li> + * <li>count if used for file system partition column</li> + * </ol> + * + * @param agg aggregate relational expression + * @param scan scan relational expression + * @param project project relational expression + * + * @return true if counts were collected, false otherwise + */ + boolean collect(DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) { + return calculateCounts(agg, scan, project) && !containsPartitionColumns(); + } + + /** + * @return list of counts + */ + List<Long> getCounts() { --- End diff -- In stead of returning list of Long, can we return either a map from schemaPath to counts, or list of pair schemaPath, count? Returning just the lists of counts make it hard to figure out which count corresponds to which aggregate function. > Count(dir0) on parquet returns 0 result > --------------------------------------- > > Key: DRILL-4735 > URL: https://issues.apache.org/jira/browse/DRILL-4735 > Project: Apache Drill > Issue Type: Bug > Components: Query Planning & Optimization, Storage - Parquet > Affects Versions: 1.0.0, 1.4.0, 1.6.0, 1.7.0 > Reporter: Krystal > Assignee: Arina Ielchiieva > Priority: Critical > > Selecting a count of dir0, dir1, etc against a parquet directory returns 0 > rows. > select count(dir0) from `min_max_dir`; > +---------+ > | EXPR$0 | > +---------+ > | 0 | > +---------+ > select count(dir1) from `min_max_dir`; > +---------+ > | EXPR$0 | > +---------+ > | 0 | > +---------+ > If I put both dir0 and dir1 in the same select, it returns expected result: > select count(dir0), count(dir1) from `min_max_dir`; > +---------+---------+ > | EXPR$0 | EXPR$1 | > +---------+---------+ > | 600 | 600 | > +---------+---------+ > Here is the physical plan for count(dir0) query: > {code} > 00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 20.0, > cumulative cost = {22.0 rows, 22.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id > = 1346 > 00-01 Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): > rowcount = 20.0, cumulative cost = {20.0 rows, 20.0 cpu, 0.0 io, 0.0 network, > 0.0 memory}, id = 1345 > 00-02 Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): > rowcount = 20.0, cumulative cost = {20.0 rows, 20.0 cpu, 0.0 io, 0.0 network, > 0.0 memory}, id = 1344 > 00-03 > Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@3da85d3b[columns > = null, isStarQuery = false, isSkipQuery = false]]) : rowType = > RecordType(BIGINT count): rowcount = 20.0, cumulative cost = {20.0 rows, 20.0 > cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1343 > {code} > Here is part of the explain plan for the count(dir0) and count(dir1) in the > same select: > {code} > 00-00 Screen : rowType = RecordType(BIGINT EXPR$0, BIGINT EXPR$1): > rowcount = 60.0, cumulative cost = {1206.0 rows, 15606.0 cpu, 0.0 io, 0.0 > network, 0.0 memory}, id = 1623 > 00-01 Project(EXPR$0=[$0], EXPR$1=[$1]) : rowType = RecordType(BIGINT > EXPR$0, BIGINT EXPR$1): rowcount = 60.0, cumulative cost = {1200.0 rows, > 15600.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1622 > 00-02 StreamAgg(group=[{}], EXPR$0=[COUNT($0)], EXPR$1=[COUNT($1)]) : > rowType = RecordType(BIGINT EXPR$0, BIGINT EXPR$1): rowcount = 60.0, > cumulative cost = {1200.0 rows, 15600.0 cpu, 0.0 io, 0.0 network, 0.0 > memory}, id = 1621 > 00-03 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath > [path=maprfs:/drill/testdata/min_max_dir/1999/Apr/voter20.parquet/0_0_0.parquet], > ReadEntryWithPath > [path=maprfs:/drill/testdata/min_max_dir/1999/MAR/voter15.parquet/0_0_0.parquet], > ReadEntryWithPath > [path=maprfs:/drill/testdata/min_max_dir/1985/jan/voter5.parquet/0_0_0.parquet], > ReadEntryWithPath > [path=maprfs:/drill/testdata/min_max_dir/1985/apr/voter60.parquet/0_0_0.parquet],..., > ReadEntryWithPath > [path=maprfs:/drill/testdata/min_max_dir/2014/jul/voter35.parquet/0_0_0.parquet]], > selectionRoot=maprfs:/drill/testdata/min_max_dir, numFiles=16, > usedMetadataFile=false, columns=[`dir0`, `dir1`]]]) : rowType = > RecordType(ANY dir0, ANY dir1): rowcount = 600.0, cumulative cost = {600.0 > rows, 1200.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1620 > {code} > Notice that in the first case, > "org.apache.drill.exec.store.pojo.PojoRecordReader" is used. -- This message was sent by Atlassian JIRA (v6.4.14#64029)