This is an automated email from the ASF dual-hosted git repository. boaz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit c1ca512ea54cc7fe1b7ee581f21aa84a95aab14f Author: Gautam Parai <[email protected]> AuthorDate: Tue Mar 19 12:04:13 2019 -0700 DRILL-7148: Use improved join cardinality and ndv estimation with statistics closes #1744 --- .../exec/planner/common/DrillJoinRelBase.java | 34 ++-- .../drill/exec/planner/common/DrillRelOptUtil.java | 62 +++++++ .../drill/exec/planner/common/DrillStatsTable.java | 2 +- .../planner/cost/DrillRelMdDistinctRowCount.java | 181 +++++++++++++++++---- .../exec/planner/cost/DrillRelMdRowCount.java | 41 +++-- .../exec/planner/cost/DrillRelMdSelectivity.java | 40 +++-- .../exec/planner/physical/PlannerSettings.java | 6 + .../exec/server/options/SystemOptionManager.java | 1 + .../java/org/apache/drill/exec/util/Utilities.java | 2 +- .../java-exec/src/main/resources/drill-module.conf | 1 + .../org/apache/drill/exec/sql/TestAnalyze.java | 12 +- .../drill/metastore/ColumnStatisticsKind.java | 2 +- 12 files changed, 295 insertions(+), 89 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java index a6b6f4a..6150bf3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.planner.common; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -33,6 +34,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Pair; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.expr.holders.IntHolder; import org.apache.drill.exec.physical.impl.join.JoinUtils; @@ -102,26 +104,32 @@ public abstract class DrillJoinRelBase extends Join implements DrillJoin { return joinRowFactor * this.getLeft().estimateRowCount(mq) * this.getRight().estimateRowCount(mq); } - int[] joinFields = new int[2]; - LogicalJoin jr = LogicalJoin.create(this.getLeft(), this.getRight(), this.getCondition(), this.getVariablesSet(), this.getJoinType()); if (!DrillRelOptUtil.guessRows(this) //Statistics present for left and right side of the join - && jr.getJoinType() == JoinRelType.INNER - && DrillRelOptUtil.analyzeSimpleEquiJoin((Join)jr, joinFields)) { - ImmutableBitSet leq = ImmutableBitSet.of(joinFields[0]); - ImmutableBitSet req = ImmutableBitSet.of(joinFields[1]); + && jr.getJoinType() == JoinRelType.INNER) { + List<Pair<Integer, Integer>> joinConditions = DrillRelOptUtil.analyzeSimpleEquiJoin((Join)jr); + if (joinConditions.size() > 0) { + List<Integer> leftSide = new ArrayList<>(); + List<Integer> rightSide = new ArrayList<>(); + for (Pair<Integer, Integer> condition : joinConditions) { + leftSide.add(condition.left); + rightSide.add(condition.right); + } + ImmutableBitSet leq = ImmutableBitSet.of(leftSide); + ImmutableBitSet req = ImmutableBitSet.of(rightSide); - Double ldrc = mq.getDistinctRowCount(this.getLeft(), leq, null); - Double rdrc = mq.getDistinctRowCount(this.getRight(), req, null); + Double ldrc = mq.getDistinctRowCount(this.getLeft(), leq, null); + Double rdrc = mq.getDistinctRowCount(this.getRight(), req, null); - Double lrc = mq.getRowCount(this.getLeft()); - Double rrc = mq.getRowCount(this.getRight()); + Double lrc = mq.getRowCount(this.getLeft()); + Double rrc = mq.getRowCount(this.getRight()); - if (ldrc != null && rdrc != null && lrc != null && rrc != null) { - // Join cardinality = (lrc * rrc) / Math.max(ldrc, rdrc). Avoid overflow by dividing earlier - return (lrc / Math.max(ldrc, rdrc)) * rrc; + if (ldrc != null && rdrc != null && lrc != null && rrc != null) { + // Join cardinality = (lrc * rrc) / Math.max(ldrc, rdrc). Avoid overflow by dividing earlier + return (lrc / Math.max(ldrc, rdrc)) * rrc; + } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java index 66499d6..3838bf9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java @@ -667,4 +667,66 @@ public abstract class DrillRelOptUtil { } return drillTable; } + + public static List<Pair<Integer, Integer>> analyzeSimpleEquiJoin(Join join) { + List<Pair<Integer, Integer>> joinConditions = new ArrayList<>(); + try { + RexVisitor<Void> visitor = + new RexVisitorImpl<Void>(true) { + public Void visitCall(RexCall call) { + if (call.getKind() == SqlKind.AND || call.getKind() == SqlKind.OR) { + super.visitCall(call); + } else { + if (call.getKind() == SqlKind.EQUALS) { + int leftFieldCount = join.getLeft().getRowType().getFieldCount(); + int rightFieldCount = join.getRight().getRowType().getFieldCount(); + RexNode leftComparand = call.operands.get(0); + RexNode rightComparand = call.operands.get(1); + RexInputRef leftFieldAccess = (RexInputRef) leftComparand; + RexInputRef rightFieldAccess = (RexInputRef) rightComparand; + if (leftFieldAccess.getIndex() >= leftFieldCount + rightFieldCount || + rightFieldAccess.getIndex() >= leftFieldCount + rightFieldCount) { + joinConditions.clear(); + throw new Util.FoundOne(call); + } + /* Both columns reference same table */ + if ((leftFieldAccess.getIndex() >= leftFieldCount && + rightFieldAccess.getIndex() >= leftFieldCount) || + (leftFieldAccess.getIndex() < leftFieldCount && + rightFieldAccess.getIndex() < leftFieldCount)) { + joinConditions.clear(); + throw new Util.FoundOne(call); + } else { + if (leftFieldAccess.getIndex() < leftFieldCount) { + joinConditions.add(Pair.of(leftFieldAccess.getIndex(), + rightFieldAccess.getIndex() - leftFieldCount)); + } else { + joinConditions.add(Pair.of(rightFieldAccess.getIndex(), + leftFieldAccess.getIndex() - leftFieldCount)); + } + } + } + } + return null; + } + }; + join.getCondition().accept(visitor); + } catch (Util.FoundOne ex) { + Util.swallow(ex, null); + } + return joinConditions; + } + + public static List<RexInputRef> findAllRexInputRefs(final RexNode node) { + List<RexInputRef> rexRefs = new ArrayList<>(); + RexVisitor<Void> visitor = + new RexVisitorImpl<Void>(true) { + public Void visitInputRef(RexInputRef inputRef) { + rexRefs.add(inputRef); + return super.visitInputRef(inputRef); + } + }; + node.accept(visitor); + return rexRefs; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java index 78e87d0..7565abf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java @@ -483,7 +483,7 @@ public class DrillStatsTable { Map<StatisticsKind, Object> statisticsValues = new HashMap<>(); Double ndv = statsProvider.getNdv(fieldName); if (ndv != null) { - statisticsValues.put(ColumnStatisticsKind.NVD, ndv); + statisticsValues.put(ColumnStatisticsKind.NDV, ndv); } Double nonNullCount = statsProvider.getNNRowCount(fieldName); if (nonNullCount != null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java index 37cd55a..8b11a9a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java @@ -17,14 +17,18 @@ */ package org.apache.drill.exec.planner.cost; +import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.SingleRel; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.core.Window; import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMdDistinctRowCount; @@ -33,23 +37,25 @@ import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.util.BuiltInMethod; import org.apache.calcite.util.ImmutableBitSet; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.planner.common.DrillJoinRelBase; import org.apache.drill.exec.planner.common.DrillRelOptUtil; -import org.apache.drill.exec.planner.common.DrillScanRelBase; import org.apache.drill.exec.planner.logical.DrillScanRel; import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.planner.physical.PrelUtil; +import org.apache.drill.exec.util.Utilities; import org.apache.drill.metastore.ColumnStatistics; import org.apache.drill.metastore.ColumnStatisticsKind; import org.apache.drill.metastore.TableMetadata; -import java.io.IOException; -import org.apache.drill.exec.util.Utilities; - public class DrillRelMdDistinctRowCount extends RelMdDistinctRowCount{ private static final DrillRelMdDistinctRowCount INSTANCE = new DrillRelMdDistinctRowCount(); @@ -80,10 +86,10 @@ public class DrillRelMdDistinctRowCount extends RelMdDistinctRowCount{ @Override public Double getDistinctRowCount(RelNode rel, RelMetadataQuery mq, ImmutableBitSet groupKey, RexNode predicate) { - if (rel instanceof DrillScanRelBase) { // Applies to both Drill Logical and Physical Rels + if (rel instanceof TableScan) { // Applies to Calcite/Drill logical and Drill physical rels if (!DrillRelOptUtil.guessRows(rel)) { DrillTable table = Utilities.getDrillTable(rel.getTable()); - return getDistinctRowCountInternal(((DrillScanRelBase) rel), mq, table, groupKey, rel.getRowType(), predicate); + return getDistinctRowCountInternal(((TableScan) rel), mq, table, groupKey, rel.getRowType(), predicate); } else { /* If we are not using statistics OR there is no table or metadata (stats) table associated with scan, * estimate the distinct row count. Consistent with the estimation of Aggregate row count in @@ -124,7 +130,7 @@ public class DrillRelMdDistinctRowCount extends RelMdDistinctRowCount{ * set of columns indicated by groupKey. * column"). */ - private Double getDistinctRowCountInternal(DrillScanRelBase scan, RelMetadataQuery mq, DrillTable table, + private Double getDistinctRowCountInternal(TableScan scan, RelMetadataQuery mq, DrillTable table, ImmutableBitSet groupKey, RelDataType type, RexNode predicate) { double selectivity, rowCount; /* If predicate is present, determine its selectivity to estimate filtered rows. @@ -136,7 +142,6 @@ public class DrillRelMdDistinctRowCount extends RelMdDistinctRowCount{ if (groupKey.length() == 0) { return selectivity * rowCount; } - /* If predicate is present, determine its selectivity to estimate filtered rows. Thereafter, * compute the number of distinct rows */ @@ -148,29 +153,39 @@ public class DrillRelMdDistinctRowCount extends RelMdDistinctRowCount{ // Statistics cannot be obtained, use default behaviour return scan.estimateRowCount(mq) * 0.1; } - double s = 1.0; + double s = 1.0; + boolean allCols = true; for (int i = 0; i < groupKey.length(); i++) { final String colName = type.getFieldNames().get(i); // Skip NDV, if not available if (!groupKey.get(i)) { - continue; + allCols = false; + break; } - ColumnStatistics columnStatistics = tableMetadata != null ? tableMetadata.getColumnStatistics(SchemaPath.getSimplePath(colName)) : null; - Double ndv = columnStatistics != null ? (Double) columnStatistics.getStatistic(ColumnStatisticsKind.NVD) : null; + ColumnStatistics columnStatistics = tableMetadata != null ? + tableMetadata.getColumnStatistics(SchemaPath.getSimplePath(colName)) : null; + Double ndv = columnStatistics != null ? (Double) columnStatistics.getStatistic(ColumnStatisticsKind.NDV) : null; if (ndv == null) { continue; } - s *= 1 - ndv / rowCount; + s *= ndv; + selectivity = getPredSelectivityContainingInputRef(predicate, i, mq, scan); + /* If predicate is on group-by column, scale down the NDV by selectivity. Consider the query + * select a, b from t where a = 10 group by a, b. Here, NDV(a) will be scaled down by SEL(a) + * whereas NDV(b) will not. + */ + if (selectivity > 0) { + s *= selectivity; + } } - if (s > 0 && s < 1.0) { - return (1 - s) * selectivity * rowCount; - } else if (s == 1.0) { + s = Math.min(s, rowCount); + if (!allCols) { // Could not get any NDV estimate from stats - probably stats not present for GBY cols. So Guess! return scan.estimateRowCount(mq) * 0.1; } else { - /* rowCount maybe less than NDV(different source), sanity check OR NDV not used at all */ - return selectivity * rowCount; + /* rowCount maybe less than NDV(different source), sanity check OR NDV not used at all */ + return s; } } @@ -195,7 +210,7 @@ public class DrillRelMdDistinctRowCount extends RelMdDistinctRowCount{ if (predicate != null) { List<RexNode> leftFilters = new ArrayList<>(); List<RexNode> rightFilters = new ArrayList<>(); - List<RexNode> joinFilters = new ArrayList<>(); + List<RexNode> joinFilters = new ArrayList(); List<RexNode> predList = RelOptUtil.conjunctions(predicate); RelOptUtil.classifyFilters(joinRel, predList, joinType, joinType == JoinRelType.INNER, !joinType.generatesNullsOnLeft(), !joinType.generatesNullsOnRight(), joinFilters, @@ -205,28 +220,122 @@ public class DrillRelMdDistinctRowCount extends RelMdDistinctRowCount{ rightPred = RexUtil.composeConjunction(rexBuilder, rightFilters, true); } - Double leftDistRowCount = null; - Double rightDistRowCount = null; double distRowCount = 1; - ImmutableBitSet lmb = leftMask.build(); - ImmutableBitSet rmb = rightMask.build(); - // Get NDV estimates for the left and right side predicates, if applicable - if (lmb.length() > 0) { - leftDistRowCount = mq.getDistinctRowCount(left, lmb, leftPred); - if (leftDistRowCount != null) { - distRowCount = leftDistRowCount; + int gbyCols = 0; + PlannerSettings plannerSettings = PrelUtil.getPlannerSettings(joinRel.getCluster().getPlanner()); + /* + * The NDV for a multi-column GBY key past a join is determined as follows: + * GBY(s1, s2, s3) = CNDV(s1)*CNDV(s2)*CNDV(s3) + * where CNDV is determined as follows: + * A) If sX is present as a join column (sX = tX) CNDV(sX) = MIN(NDV(sX), NDV(tX)) where X =1, 2, 3, etc + * B) Otherwise, based on independence assumption CNDV(sX) = NDV(sX) + */ + Set<ImmutableBitSet> joinFiltersSet = new HashSet<>(); + for (RexNode filter : RelOptUtil.conjunctions(joinRel.getCondition())) { + final RelOptUtil.InputFinder inputFinder = RelOptUtil.InputFinder.analyze(filter); + joinFiltersSet.add(inputFinder.inputBitSet.build()); + } + for (int idx = 0; idx < groupKey.length(); idx++) { + if (groupKey.get(idx)) { + // GBY key is present in some filter - now try options A) and B) as described above + double ndvSGby = Double.MAX_VALUE; + boolean presentInFilter = false; + ImmutableBitSet sGby = getSingleGbyKey(groupKey, idx); + if (sGby != null) { + for (ImmutableBitSet jFilter : joinFiltersSet) { + if (jFilter.contains(sGby)) { + presentInFilter = true; + // Found join condition containing this GBY key. Pick min NDV across all columns in this join + for (int fidx : jFilter) { + if (fidx < left.getRowType().getFieldCount()) { + ndvSGby = Math.min(ndvSGby, mq.getDistinctRowCount(left, ImmutableBitSet.of(fidx), leftPred)); + } else { + ndvSGby = Math.min(ndvSGby, mq.getDistinctRowCount(right, ImmutableBitSet.of(fidx-left.getRowType().getFieldCount()), rightPred)); + } + } + break; + } + } + // Did not find it in any join condition(s) + if (!presentInFilter) { + for (int sidx : sGby) { + if (sidx < left.getRowType().getFieldCount()) { + ndvSGby = mq.getDistinctRowCount(left, ImmutableBitSet.of(sidx), leftPred); + } else { + ndvSGby = mq.getDistinctRowCount(right, ImmutableBitSet.of(sidx-left.getRowType().getFieldCount()), rightPred); + } + } + } + ++gbyCols; + // Multiply NDV(s) of different GBY cols to determine the overall NDV + distRowCount *= ndvSGby; + } } } - if (rmb.length() > 0) { - rightDistRowCount = mq.getDistinctRowCount(right, rmb, rightPred); - if (rightDistRowCount != null) { - distRowCount = rightDistRowCount; + if (gbyCols > 1) { // Scale with multi-col NDV factor if more than one GBY cols were found + distRowCount *= plannerSettings.getStatisticsMultiColNdvAdjustmentFactor(); + } + double joinRowCount = mq.getRowCount(joinRel); + // Cap NDV to join row count + distRowCount = Math.min(distRowCount, joinRowCount); + return RelMdUtil.numDistinctVals(distRowCount, joinRowCount); + } + + private ImmutableBitSet getSingleGbyKey(ImmutableBitSet groupKey, int idx) { + if (groupKey.get(idx)) { + return ImmutableBitSet.builder().set(idx, idx+1).build(); + } else { + return null; + } + } + + private double getPredSelectivityContainingInputRef(RexNode predicate, int inputRef, + RelMetadataQuery mq, TableScan scan) { + if (predicate instanceof RexCall) { + if (predicate.getKind() == SqlKind.AND) { + double sel, andSel = 1.0; + for (RexNode op : ((RexCall) predicate).getOperands()) { + sel = getPredSelectivityContainingInputRef(op, inputRef, mq, scan); + if (sel > 0) { + andSel *= sel; + } + } + return andSel; + } else if (predicate.getKind() == SqlKind.OR) { + double sel, orSel = 0.0; + for (RexNode op : ((RexCall) predicate).getOperands()) { + sel = getPredSelectivityContainingInputRef(op, inputRef, mq, scan); + if (sel > 0) { + orSel += sel; + } + } + return orSel; + } else { + for (RexNode op : ((RexCall) predicate).getOperands()) { + if (op instanceof RexInputRef && inputRef != ((RexInputRef) op).getIndex()) { + return -1.0; + } + } + return mq.getSelectivity(scan, predicate); } + } else { + return -1.0; } - // Use max of NDVs from both sides of the join, if applicable - if (leftDistRowCount != null && rightDistRowCount != null) { - distRowCount = Math.max(leftDistRowCount, rightDistRowCount); + } + + @Override + public Double getDistinctRowCount(RelSubset rel, RelMetadataQuery mq, + ImmutableBitSet groupKey, RexNode predicate) { + if (!DrillRelOptUtil.guessRows(rel)) { + final RelNode best = rel.getBest(); + if (best != null) { + return mq.getDistinctRowCount(best, groupKey, predicate); + } + final RelNode original = rel.getOriginal(); + if (original != null) { + return mq.getDistinctRowCount(original, groupKey, predicate); + } } - return RelMdUtil.numDistinctVals(distRowCount, mq.getRowCount(joinRel)); + return super.getDistinctRowCount(rel, mq, groupKey, predicate); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java index 814a96d..b65c582 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.planner.cost; import java.io.IOException; -import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.SingleRel; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.Filter; @@ -35,7 +34,9 @@ import org.apache.calcite.util.BuiltInMethod; import org.apache.calcite.util.ImmutableBitSet; import org.apache.drill.exec.planner.common.DrillLimitRelBase; import org.apache.drill.exec.planner.common.DrillRelOptUtil; +import org.apache.drill.exec.planner.common.DrillScanRelBase; import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.planner.physical.AggPrelBase; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.util.Utilities; @@ -53,7 +54,21 @@ public class DrillRelMdRowCount extends RelMdRowCount{ if (groupKey.isEmpty()) { return 1.0; - } else { + } else if (!DrillRelOptUtil.guessRows(rel) && + rel instanceof AggPrelBase && + ((AggPrelBase) rel).getOperatorPhase() == AggPrelBase.OperatorPhase.PHASE_1of2) { + // Phase 1 Aggregate would return rows in the range [NDV, input_rows]. Hence, use the + // existing estimate of 1/10 * input_rows + Double rowCount = mq.getRowCount(rel.getInput()) / 10; + Double ndv = mq.getDistinctRowCount(rel.getInput(), groupKey, null); + // Use max of NDV and input_rows/10 + if (ndv != null) { + rowCount = Math.max(ndv, rowCount); + } + // Grouping sets multiply + rowCount *= rel.getGroupSets().size(); + return rowCount; + } else { return super.getRowCount(rel, mq); } } @@ -88,20 +103,13 @@ public class DrillRelMdRowCount extends RelMdRowCount{ } @Override - public Double getRowCount(RelNode rel, RelMetadataQuery mq) { - if (rel instanceof TableScan) { - return getRowCountInternal((TableScan)rel, mq); - } - return super.getRowCount(rel, mq); - } - - @Override public Double getRowCount(Filter rel, RelMetadataQuery mq) { // Need capped selectivity estimates. See the Filter getRows() method return rel.getRows(); } - private Double getRowCountInternal(TableScan rel, RelMetadataQuery mq) { + @Override + public Double getRowCount(TableScan rel, RelMetadataQuery mq) { DrillTable table = Utilities.getDrillTable(rel.getTable()); PlannerSettings settings = PrelUtil.getSettings(rel.getCluster()); // If guessing, return selectivity from RelMDRowCount @@ -112,14 +120,19 @@ public class DrillRelMdRowCount extends RelMdRowCount{ try { if (table != null && table.getGroupScan().getTableMetadata() != null - && (boolean) TableStatisticsKind.HAS_STATISTICS.getValue(table.getGroupScan().getTableMetadata()) + && (boolean) TableStatisticsKind.HAS_STATISTICS.getValue(table.getGroupScan().getTableMetadata())) { /* For GroupScan rely on accurate count from the scan, if available, instead of * statistics since partition pruning/filter pushdown might have occurred. * e.g. ParquetGroupScan returns accurate rowcount. The other way would be to * iterate over the rowgroups present in the GroupScan to compute the rowcount. */ - && !(table.getGroupScan().getScanStats(settings).getGroupScanProperty().hasExactRowCount())) { - return (Double) TableStatisticsKind.EST_ROW_COUNT.getValue(table.getGroupScan().getTableMetadata()); + if (!table.getGroupScan().getScanStats(settings).getGroupScanProperty().hasExactRowCount()) { + return (Double) TableStatisticsKind.EST_ROW_COUNT.getValue(table.getGroupScan().getTableMetadata()); + } else { + if (!(rel instanceof DrillScanRelBase)) { + return table.getGroupScan().getScanStats(settings).getRecordCount(); + } + } } } catch (IOException ex) { return super.getRowCount(rel, mq); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java index d04b19e..533cd1e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java @@ -19,12 +19,12 @@ package org.apache.drill.exec.planner.cost; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.EnumSet; import java.util.Set; import java.util.Map; import java.util.HashMap; -import java.util.HashSet; import java.util.stream.Collectors; @@ -180,7 +180,26 @@ public class DrillRelMdSelectivity extends RelMdSelectivity { double orSel = 0; for (RexNode orPred : RelOptUtil.disjunctions(pred)) { if (isMultiColumnPredicate(orPred) && !combinedRangePredicates.contains(orPred)) { - orSel += RelMdUtil.guessSelectivity(orPred); //CALCITE guess + Set uniqueRefs = new HashSet<>(); + uniqueRefs.add(DrillRelOptUtil.findAllRexInputRefs(orPred)); + // If equality predicate involving single column - selectivity is 1.0 + if (uniqueRefs.size() == 1) { + try { + RexVisitor<Void> visitor = + new RexVisitorImpl<Void>(true) { + public Void visitCall(RexCall call) { + if (call.getKind() != SqlKind.EQUALS) { + throw new Util.FoundOne(call); + } + return super.visitCall(call); + } + }; + pred.accept(visitor); + orSel += 1.0; + } catch (Util.FoundOne e) { + orSel += RelMdUtil.guessSelectivity(orPred); //CALCITE guess + } + } } else if (orPred.isA(SqlKind.EQUALS)) { orSel += computeEqualsSelectivity(tableMetadata, orPred, fieldNames); } else if (orPred.isA(RANGE_PREDICATE) || combinedRangePredicates.contains(orPred)) { @@ -276,7 +295,7 @@ public class DrillRelMdSelectivity extends RelMdSelectivity { SchemaPath col = getColumn(orPred, fieldNames); if (col != null) { ColumnStatistics columnStatistics = tableMetadata != null ? tableMetadata.getColumnStatistics(col) : null; - Double ndv = columnStatistics != null ? (Double) columnStatistics.getStatistic(ColumnStatisticsKind.NVD) : null; + Double ndv = columnStatistics != null ? (Double) columnStatistics.getStatistic(ColumnStatisticsKind.NDV) : null; if (ndv != null) { return 1.00 / ndv; } @@ -423,19 +442,6 @@ public class DrillRelMdSelectivity extends RelMdSelectivity { } private boolean isMultiColumnPredicate(final RexNode node) { - return findAllRexInputRefs(node).size() > 1; - } - - private static List<RexInputRef> findAllRexInputRefs(final RexNode node) { - List<RexInputRef> rexRefs = new ArrayList<>(); - RexVisitor<Void> visitor = - new RexVisitorImpl<Void>(true) { - public Void visitInputRef(RexInputRef inputRef) { - rexRefs.add(inputRef); - return super.visitInputRef(inputRef); - } - }; - node.accept(visitor); - return rexRefs; + return DrillRelOptUtil.findAllRexInputRefs(node).size() > 1; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index a5506d4..ee10a15 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -231,6 +231,8 @@ public class PlannerSettings implements Context{ public static final BooleanValidator STATISTICS_USE = new BooleanValidator("planner.statistics.use", null); + public static final RangeDoubleValidator STATISTICS_MULTICOL_NDV_ADJUST_FACTOR = new RangeDoubleValidator("planner.statistics.multicol_ndv_adjustment_factor", 0.0, 1.0, null); + public OptionManager options = null; public FunctionImplementationRegistry functionImplementationRegistry = null; @@ -475,6 +477,10 @@ public class PlannerSettings implements Context{ return options.getOption(STATISTICS_USE); } + public double getStatisticsMultiColNdvAdjustmentFactor() { + return options.getOption(STATISTICS_MULTICOL_NDV_ADJUST_FACTOR); + } + @Override public <T> T unwrap(Class<T> clazz) { if(clazz == PlannerSettings.class){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 61fefe7..4c28887 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -121,6 +121,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(PlannerSettings.ENABLE_UNNEST_LATERAL), new OptionDefinition(PlannerSettings.FORCE_2PHASE_AGGR), // for testing new OptionDefinition(PlannerSettings.STATISTICS_USE), + new OptionDefinition(PlannerSettings.STATISTICS_MULTICOL_NDV_ADJUST_FACTOR), new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR), new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)), new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR), diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java index 2446ba7..f440cf1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java @@ -105,7 +105,7 @@ public class Utilities { */ public static DrillTable getDrillTable(RelOptTable table) { DrillTable drillTable = table.unwrap(DrillTable.class); - if (drillTable == null) { + if (drillTable == null && table.unwrap(DrillTranslatableTable.class) != null) { drillTable = table.unwrap(DrillTranslatableTable.class).getDrillTable(); } return drillTable; diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 5096680..ea254a5 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -591,6 +591,7 @@ drill.exec.options: { planner.producer_consumer_queue_size: 10, planner.slice_target: 100000, planner.statistics.use: false, + planner.statistics.multicol_ndv_adjustment_factor: 1.0, planner.store.parquet.rowgroup.filter.pushdown.enabled: true, planner.store.parquet.rowgroup.filter.pushdown.threshold: 10000, # Max per node should always be configured as zero and diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java index 94583de..1d404e1 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java @@ -258,14 +258,14 @@ public class TestAnalyze extends BaseTestQuery { query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept" + " on emp.department_id = dept.department_id"; - String[] expectedPlan4 = {"HashJoin\\(condition.*\\).*rowcount = 1154.9999999999995,.*", + String[] expectedPlan4 = {"HashJoin\\(condition.*\\).*rowcount = 1155.0,.*", "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*", "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"}; PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan4, new String[]{}); query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept" + " on emp.department_id = dept.department_id where dept.department_id = 5"; - String[] expectedPlan5 = {"HashJoin\\(condition.*\\).*rowcount = 96.24999999999997,.*", + String[] expectedPlan5 = {"HashJoin\\(condition.*\\).*rowcount = 96.25,.*", "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*", "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"}; PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan5, new String[]{}); @@ -290,8 +290,8 @@ public class TestAnalyze extends BaseTestQuery { query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept" + " on emp.department_id = dept.department_id " + " group by emp.employee_id"; - String[] expectedPlan8 = {"HashAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 730.0992454469839,.*", - "HashJoin\\(condition.*\\).*rowcount = 1154.9999999999995,.*", + String[] expectedPlan8 = {"HashAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 115.49475630811243,.*", + "HashJoin\\(condition.*\\).*rowcount = 1155.0,.*", "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*", "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"}; PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan8, new String[]{}); @@ -301,8 +301,8 @@ public class TestAnalyze extends BaseTestQuery { + " on emp.department_id = dept.department_id " + " group by emp.employee_id, emp.store_id, dept.department_description " + " having dept.department_description = 'FINANCE'"; - String[] expectedPlan9 = {"HashAgg\\(group=\\[\\{0, 1, 2\\}\\]\\).*rowcount = 92.3487011031316.*", - "HashJoin\\(condition.*\\).*rowcount = 96.24999999999997,.*", + String[] expectedPlan9 = {"HashAgg\\(group=\\[\\{0, 1, 2\\}\\]\\).*rowcount = 60.84160378724867.*", + "HashJoin\\(condition.*\\).*rowcount = 96.25,.*", "Scan.*columns=\\[`department_id`, `employee_id`, `store_id`\\].*rowcount = 1155.0.*", "Filter\\(condition=\\[=\\(\\$1, 'FINANCE'\\)\\]\\).*rowcount = 1.0,.*", "Scan.*columns=\\[`department_id`, `department_description`\\].*rowcount = 12.0.*"}; diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/ColumnStatisticsKind.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/ColumnStatisticsKind.java index f1c8196..51195f4 100644 --- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/ColumnStatisticsKind.java +++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/ColumnStatisticsKind.java @@ -145,7 +145,7 @@ public enum ColumnStatisticsKind implements CollectableColumnStatisticsKind { /** * Column statistics kind which represents number of distinct values for the specific column. */ - NVD(Statistic.NDV) { + NDV(Statistic.NDV) { @Override public Object mergeStatistics(List<? extends ColumnStatistics> statisticsList) { throw new UnsupportedOperationException("Cannot merge statistics for NDV");
