[
https://issues.apache.org/jira/browse/DRILL-4735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16114732#comment-16114732
]
ASF GitHub Bot commented on DRILL-4735:
---------------------------------------
Github user jinfengni commented on a diff in the pull request:
https://github.com/apache/drill/pull/882#discussion_r131447579
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
---
@@ -85,109 +91,231 @@ protected ConvertCountToDirectScan(RelOptRuleOperand
rule, String id) {
@Override
public void onMatch(RelOptRuleCall call) {
final DrillAggregateRel agg = (DrillAggregateRel) call.rel(0);
- final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1);
- final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel)
call.rel(1) : null;
+ final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -
1);
+ final DrillProjectRel project = call.rels.length == 3 ?
(DrillProjectRel) call.rel(1) : null;
final GroupScan oldGrpScan = scan.getGroupScan();
final PlannerSettings settings =
PrelUtil.getPlannerSettings(call.getPlanner());
- // Only apply the rule when :
+ // Only apply the rule when:
// 1) scan knows the exact row count in getSize() call,
// 2) No GroupBY key,
- // 3) only one agg function (Check if it's count(*) below).
- // 4) No distinct agg call.
+ // 3) No distinct agg call.
if
(!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount()
&& agg.getGroupCount() == 0
- && agg.getAggCallList().size() == 1
&& !agg.containsDistinctCall())) {
return;
}
- AggregateCall aggCall = agg.getAggCallList().get(0);
-
- if (aggCall.getAggregation().getName().equals("COUNT") ) {
-
- long cnt = 0;
- // count(*) == > empty arg ==> rowCount
- // count(Not-null-input) ==> rowCount
- if (aggCall.getArgList().isEmpty() ||
- (aggCall.getArgList().size() == 1 &&
- !
agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable()))
{
- cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount();
- } else if (aggCall.getArgList().size() == 1) {
- // count(columnName) ==> Agg ( Scan )) ==> columnValueCount
- int index = aggCall.getArgList().get(0);
-
- if (proj != null) {
- // project in the middle of Agg and Scan : Only when input of
AggCall is a RexInputRef in Project, we find the index of Scan's field.
- // For instance,
- // Agg - count($0)
- // \
- // Proj - Exp={$1}
- // \
- // Scan (col1, col2).
- // return count of "col2" in Scan's metadata, if found.
-
- if (proj.getProjects().get(index) instanceof RexInputRef) {
- index = ((RexInputRef)
proj.getProjects().get(index)).getIndex();
- } else {
- return; // do not apply for all other cases.
- }
- }
+ final CountsCollector countsCollector = new CountsCollector(settings);
+ // if counts were not collected, rule won't be applied
+ if (!countsCollector.collect(agg, scan, project)) {
+ return;
+ }
- String columnName =
scan.getRowType().getFieldNames().get(index).toLowerCase();
+ final RelDataType scanRowType = constructDataType(agg);
- cnt =
oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName));
- if (cnt == GroupScan.NO_COLUMN_STATS) {
- // if column stats are not available don't apply this rule
- return;
- }
- } else {
- return; // do nothing.
- }
+ final DynamicPojoRecordReader<Long> reader = new
DynamicPojoRecordReader<>(
+ buildSchema(scanRowType.getFieldNames()),
+ Collections.singletonList(countsCollector.getCounts()));
- RelDataType scanRowType =
getCountDirectScanRowType(agg.getCluster().getTypeFactory());
+ final ScanStats scanStats = new
ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1,
scanRowType.getFieldCount());
+ final GroupScan directScan = new MetadataDirectGroupScan(reader,
oldGrpScan.getFiles(), scanStats);
- final ScanPrel newScan = ScanPrel.create(scan,
-
scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON),
getCountDirectScan(cnt),
- scanRowType);
+ final ScanPrel newScan = ScanPrel.create(scan,
+
scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON),
directScan,
+ scanRowType);
- List<RexNode> exprs = Lists.newArrayList();
- exprs.add(RexInputRef.of(0, scanRowType));
+ final ProjectPrel newProject = new ProjectPrel(agg.getCluster(),
agg.getTraitSet().plus(Prel.DRILL_PHYSICAL)
+ .plus(DrillDistributionTrait.SINGLETON), newScan,
prepareFieldExpressions(scanRowType), agg.getRowType());
- final ProjectPrel newProj = new ProjectPrel(agg.getCluster(),
agg.getTraitSet().plus(Prel.DRILL_PHYSICAL)
- .plus(DrillDistributionTrait.SINGLETON), newScan, exprs,
agg.getRowType());
+ call.transformTo(newProject);
+ }
- call.transformTo(newProj);
+ /**
+ * For each aggregate call creates field with "count$" prefix and bigint
type.
+ * Constructs record type for created fields.
+ *
+ * @param aggregateRel aggregate relation expression
+ * @return record type
+ */
+ private RelDataType constructDataType(DrillAggregateRel aggregateRel) {
+ List<RelDataTypeField> fields = new ArrayList<>();
+ for (int i = 0; i < aggregateRel.getAggCallList().size(); i++) {
+ RelDataTypeField field = new RelDataTypeFieldImpl("count$" + i, i,
aggregateRel.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT));
+ fields.add(field);
}
-
+ return new RelRecordType(fields);
}
/**
- * Class to represent the count aggregate result.
+ * Builds schema based on given field names.
+ * Type for each schema is set to long.class.
+ *
+ * @param fieldNames field names
+ * @return schema
*/
- public static class CountQueryResult {
- public long count;
+ private LinkedHashMap<String, Class<?>> buildSchema(List<String>
fieldNames) {
+ LinkedHashMap<String, Class<?>> schema = new LinkedHashMap<>();
+ for (String fieldName: fieldNames) {
+ schema.put(fieldName, long.class);
+ }
+ return schema;
+ }
- public CountQueryResult(long cnt) {
- this.count = cnt;
+ /**
+ * For each field creates row expression.
+ *
+ * @param rowType row type
+ * @return list of row expressions
+ */
+ private List<RexNode> prepareFieldExpressions(RelDataType rowType) {
+ List<RexNode> expressions = new ArrayList<>();
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ expressions.add(RexInputRef.of(i, rowType));
}
+ return expressions;
}
- private RelDataType getCountDirectScanRowType(RelDataTypeFactory
typeFactory) {
- List<RelDataTypeField> fields = Lists.newArrayList();
- fields.add(new RelDataTypeFieldImpl("count", 0,
typeFactory.createSqlType(SqlTypeName.BIGINT)));
+ /**
+ * Helper class to collect counts based on metadata information.
+ * For example, for parquet files it can be obtained from parquet footer
(total row count)
+ * or from parquet metadata files (column counts).
+ */
+ private class CountsCollector {
+
+ private final PlannerSettings settings;
+ private final Set<String> implicitColumnsNames;
+ private final List<SchemaPath> columns;
+ private final List<Long> counts;
+
+ CountsCollector(PlannerSettings settings) {
+ this.settings = settings;
+ this.implicitColumnsNames =
ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet();
+ this.counts = new ArrayList<>();
+ this.columns = new ArrayList<>();
+ }
- return new RelRecordType(fields);
- }
+ /**
+ * Collects counts for each aggregation call.
+ * Will fail to collect counts if:
+ * <ol>
+ * <li>was not able to determine count for at least one aggregation
call</li>
+ * <li>count if used for file system partition column</li>
+ * </ol>
+ *
+ * @param agg aggregate relational expression
+ * @param scan scan relational expression
+ * @param project project relational expression
+ *
+ * @return true if counts were collected, false otherwise
+ */
+ boolean collect(DrillAggregateRel agg, DrillScanRel scan,
DrillProjectRel project) {
+ return calculateCounts(agg, scan, project) &&
!containsPartitionColumns();
--- End diff --
If user calls collect(agg, scan, project), the class should clear/reset all
the internal collections like "columns", "counts". Otherwise, what if user
uses the same CountsCollector instance, and call collect() twice with different
inputs?
> Count(dir0) on parquet returns 0 result
> ---------------------------------------
>
> Key: DRILL-4735
> URL: https://issues.apache.org/jira/browse/DRILL-4735
> Project: Apache Drill
> Issue Type: Bug
> Components: Query Planning & Optimization, Storage - Parquet
> Affects Versions: 1.0.0, 1.4.0, 1.6.0, 1.7.0
> Reporter: Krystal
> Assignee: Arina Ielchiieva
> Priority: Critical
>
> Selecting a count of dir0, dir1, etc against a parquet directory returns 0
> rows.
> select count(dir0) from `min_max_dir`;
> +---------+
> | EXPR$0 |
> +---------+
> | 0 |
> +---------+
> select count(dir1) from `min_max_dir`;
> +---------+
> | EXPR$0 |
> +---------+
> | 0 |
> +---------+
> If I put both dir0 and dir1 in the same select, it returns expected result:
> select count(dir0), count(dir1) from `min_max_dir`;
> +---------+---------+
> | EXPR$0 | EXPR$1 |
> +---------+---------+
> | 600 | 600 |
> +---------+---------+
> Here is the physical plan for count(dir0) query:
> {code}
> 00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 20.0,
> cumulative cost = {22.0 rows, 22.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id
> = 1346
> 00-01 Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0):
> rowcount = 20.0, cumulative cost = {20.0 rows, 20.0 cpu, 0.0 io, 0.0 network,
> 0.0 memory}, id = 1345
> 00-02 Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0):
> rowcount = 20.0, cumulative cost = {20.0 rows, 20.0 cpu, 0.0 io, 0.0 network,
> 0.0 memory}, id = 1344
> 00-03
> Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@3da85d3b[columns
> = null, isStarQuery = false, isSkipQuery = false]]) : rowType =
> RecordType(BIGINT count): rowcount = 20.0, cumulative cost = {20.0 rows, 20.0
> cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1343
> {code}
> Here is part of the explain plan for the count(dir0) and count(dir1) in the
> same select:
> {code}
> 00-00 Screen : rowType = RecordType(BIGINT EXPR$0, BIGINT EXPR$1):
> rowcount = 60.0, cumulative cost = {1206.0 rows, 15606.0 cpu, 0.0 io, 0.0
> network, 0.0 memory}, id = 1623
> 00-01 Project(EXPR$0=[$0], EXPR$1=[$1]) : rowType = RecordType(BIGINT
> EXPR$0, BIGINT EXPR$1): rowcount = 60.0, cumulative cost = {1200.0 rows,
> 15600.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1622
> 00-02 StreamAgg(group=[{}], EXPR$0=[COUNT($0)], EXPR$1=[COUNT($1)]) :
> rowType = RecordType(BIGINT EXPR$0, BIGINT EXPR$1): rowcount = 60.0,
> cumulative cost = {1200.0 rows, 15600.0 cpu, 0.0 io, 0.0 network, 0.0
> memory}, id = 1621
> 00-03 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
> [path=maprfs:/drill/testdata/min_max_dir/1999/Apr/voter20.parquet/0_0_0.parquet],
> ReadEntryWithPath
> [path=maprfs:/drill/testdata/min_max_dir/1999/MAR/voter15.parquet/0_0_0.parquet],
> ReadEntryWithPath
> [path=maprfs:/drill/testdata/min_max_dir/1985/jan/voter5.parquet/0_0_0.parquet],
> ReadEntryWithPath
> [path=maprfs:/drill/testdata/min_max_dir/1985/apr/voter60.parquet/0_0_0.parquet],...,
> ReadEntryWithPath
> [path=maprfs:/drill/testdata/min_max_dir/2014/jul/voter35.parquet/0_0_0.parquet]],
> selectionRoot=maprfs:/drill/testdata/min_max_dir, numFiles=16,
> usedMetadataFile=false, columns=[`dir0`, `dir1`]]]) : rowType =
> RecordType(ANY dir0, ANY dir1): rowcount = 600.0, cumulative cost = {600.0
> rows, 1200.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1620
> {code}
> Notice that in the first case,
> "org.apache.drill.exec.store.pojo.PojoRecordReader" is used.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)