[ https://issues.apache.org/jira/browse/DRILL-4735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16114729#comment-16114729 ]
ASF GitHub Bot commented on DRILL-4735: --------------------------------------- Github user jinfengni commented on a diff in the pull request: https://github.com/apache/drill/pull/882#discussion_r131445381 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java --- @@ -85,109 +91,231 @@ protected ConvertCountToDirectScan(RelOptRuleOperand rule, String id) { @Override public void onMatch(RelOptRuleCall call) { final DrillAggregateRel agg = (DrillAggregateRel) call.rel(0); - final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1); - final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; + final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length - 1); + final DrillProjectRel project = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; final GroupScan oldGrpScan = scan.getGroupScan(); final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); - // Only apply the rule when : + // Only apply the rule when: // 1) scan knows the exact row count in getSize() call, // 2) No GroupBY key, - // 3) only one agg function (Check if it's count(*) below). - // 4) No distinct agg call. + // 3) No distinct agg call. if (!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount() && agg.getGroupCount() == 0 - && agg.getAggCallList().size() == 1 && !agg.containsDistinctCall())) { return; } - AggregateCall aggCall = agg.getAggCallList().get(0); - - if (aggCall.getAggregation().getName().equals("COUNT") ) { - - long cnt = 0; - // count(*) == > empty arg ==> rowCount - // count(Not-null-input) ==> rowCount - if (aggCall.getArgList().isEmpty() || - (aggCall.getArgList().size() == 1 && - ! agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable())) { - cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount(); - } else if (aggCall.getArgList().size() == 1) { - // count(columnName) ==> Agg ( Scan )) ==> columnValueCount - int index = aggCall.getArgList().get(0); - - if (proj != null) { - // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef in Project, we find the index of Scan's field. - // For instance, - // Agg - count($0) - // \ - // Proj - Exp={$1} - // \ - // Scan (col1, col2). - // return count of "col2" in Scan's metadata, if found. - - if (proj.getProjects().get(index) instanceof RexInputRef) { - index = ((RexInputRef) proj.getProjects().get(index)).getIndex(); - } else { - return; // do not apply for all other cases. - } - } + final CountsCollector countsCollector = new CountsCollector(settings); + // if counts were not collected, rule won't be applied + if (!countsCollector.collect(agg, scan, project)) { + return; + } - String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase(); + final RelDataType scanRowType = constructDataType(agg); - cnt = oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName)); - if (cnt == GroupScan.NO_COLUMN_STATS) { - // if column stats are not available don't apply this rule - return; - } - } else { - return; // do nothing. - } + final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>( + buildSchema(scanRowType.getFieldNames()), + Collections.singletonList(countsCollector.getCounts())); - RelDataType scanRowType = getCountDirectScanRowType(agg.getCluster().getTypeFactory()); + final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount()); + final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats); - final ScanPrel newScan = ScanPrel.create(scan, - scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), getCountDirectScan(cnt), - scanRowType); + final ScanPrel newScan = ScanPrel.create(scan, + scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), directScan, + scanRowType); - List<RexNode> exprs = Lists.newArrayList(); - exprs.add(RexInputRef.of(0, scanRowType)); + final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) + .plus(DrillDistributionTrait.SINGLETON), newScan, prepareFieldExpressions(scanRowType), agg.getRowType()); - final ProjectPrel newProj = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) - .plus(DrillDistributionTrait.SINGLETON), newScan, exprs, agg.getRowType()); + call.transformTo(newProject); + } - call.transformTo(newProj); + /** + * For each aggregate call creates field with "count$" prefix and bigint type. + * Constructs record type for created fields. + * + * @param aggregateRel aggregate relation expression + * @return record type + */ + private RelDataType constructDataType(DrillAggregateRel aggregateRel) { + List<RelDataTypeField> fields = new ArrayList<>(); + for (int i = 0; i < aggregateRel.getAggCallList().size(); i++) { + RelDataTypeField field = new RelDataTypeFieldImpl("count$" + i, i, aggregateRel.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT)); --- End diff -- The fieldname in RelDataTypeField "count$0", "count$1" is not very informative. I think we could either get the field name from the aggregator operator, or use "count$colname" to indicate what column's count it represents. > Count(dir0) on parquet returns 0 result > --------------------------------------- > > Key: DRILL-4735 > URL: https://issues.apache.org/jira/browse/DRILL-4735 > Project: Apache Drill > Issue Type: Bug > Components: Query Planning & Optimization, Storage - Parquet > Affects Versions: 1.0.0, 1.4.0, 1.6.0, 1.7.0 > Reporter: Krystal > Assignee: Arina Ielchiieva > Priority: Critical > > Selecting a count of dir0, dir1, etc against a parquet directory returns 0 > rows. > select count(dir0) from `min_max_dir`; > +---------+ > | EXPR$0 | > +---------+ > | 0 | > +---------+ > select count(dir1) from `min_max_dir`; > +---------+ > | EXPR$0 | > +---------+ > | 0 | > +---------+ > If I put both dir0 and dir1 in the same select, it returns expected result: > select count(dir0), count(dir1) from `min_max_dir`; > +---------+---------+ > | EXPR$0 | EXPR$1 | > +---------+---------+ > | 600 | 600 | > +---------+---------+ > Here is the physical plan for count(dir0) query: > {code} > 00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 20.0, > cumulative cost = {22.0 rows, 22.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id > = 1346 > 00-01 Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): > rowcount = 20.0, cumulative cost = {20.0 rows, 20.0 cpu, 0.0 io, 0.0 network, > 0.0 memory}, id = 1345 > 00-02 Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): > rowcount = 20.0, cumulative cost = {20.0 rows, 20.0 cpu, 0.0 io, 0.0 network, > 0.0 memory}, id = 1344 > 00-03 > Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@3da85d3b[columns > = null, isStarQuery = false, isSkipQuery = false]]) : rowType = > RecordType(BIGINT count): rowcount = 20.0, cumulative cost = {20.0 rows, 20.0 > cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1343 > {code} > Here is part of the explain plan for the count(dir0) and count(dir1) in the > same select: > {code} > 00-00 Screen : rowType = RecordType(BIGINT EXPR$0, BIGINT EXPR$1): > rowcount = 60.0, cumulative cost = {1206.0 rows, 15606.0 cpu, 0.0 io, 0.0 > network, 0.0 memory}, id = 1623 > 00-01 Project(EXPR$0=[$0], EXPR$1=[$1]) : rowType = RecordType(BIGINT > EXPR$0, BIGINT EXPR$1): rowcount = 60.0, cumulative cost = {1200.0 rows, > 15600.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1622 > 00-02 StreamAgg(group=[{}], EXPR$0=[COUNT($0)], EXPR$1=[COUNT($1)]) : > rowType = RecordType(BIGINT EXPR$0, BIGINT EXPR$1): rowcount = 60.0, > cumulative cost = {1200.0 rows, 15600.0 cpu, 0.0 io, 0.0 network, 0.0 > memory}, id = 1621 > 00-03 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath > [path=maprfs:/drill/testdata/min_max_dir/1999/Apr/voter20.parquet/0_0_0.parquet], > ReadEntryWithPath > [path=maprfs:/drill/testdata/min_max_dir/1999/MAR/voter15.parquet/0_0_0.parquet], > ReadEntryWithPath > [path=maprfs:/drill/testdata/min_max_dir/1985/jan/voter5.parquet/0_0_0.parquet], > ReadEntryWithPath > [path=maprfs:/drill/testdata/min_max_dir/1985/apr/voter60.parquet/0_0_0.parquet],..., > ReadEntryWithPath > [path=maprfs:/drill/testdata/min_max_dir/2014/jul/voter35.parquet/0_0_0.parquet]], > selectionRoot=maprfs:/drill/testdata/min_max_dir, numFiles=16, > usedMetadataFile=false, columns=[`dir0`, `dir1`]]]) : rowType = > RecordType(ANY dir0, ANY dir1): rowcount = 600.0, cumulative cost = {600.0 > rows, 1200.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1620 > {code} > Notice that in the first case, > "org.apache.drill.exec.store.pojo.PojoRecordReader" is used. -- This message was sent by Atlassian JIRA (v6.4.14#64029)