DRILL-6099: Push limit past flatten(project) without pushdown into scan closes #1096
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6a55b2b2 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6a55b2b2 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6a55b2b2 Branch: refs/heads/master Commit: 6a55b2b21e047ba44f8f2d19381f18ae44263e26 Parents: 6af651f Author: Gautam Parai <[email protected]> Authored: Thu Jan 18 15:46:42 2018 -0800 Committer: Arina Ielchiieva <[email protected]> Committed: Sat Mar 3 19:47:50 2018 +0200 ---------------------------------------------------------------------- .../apache/drill/exec/planner/PlannerPhase.java | 8 ++- .../exec/planner/common/DrillRelOptUtil.java | 63 ++++++++++++++++++++ .../exec/planner/logical/DrillLimitRel.java | 6 +- .../logical/DrillPushLimitToScanRule.java | 41 +++++++++---- .../planner/sql/handlers/FindLimit0Visitor.java | 20 +------ .../impl/flatten/TestFlattenPlanning.java | 7 +++ 6 files changed, 112 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java index 18dfb35..f46a7ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java @@ -88,7 +88,7 @@ import java.util.List; public enum PlannerPhase { //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRuleSets.class); - LOGICAL_PRUNE_AND_JOIN("Loigcal Planning (with join and partition pruning)") { + LOGICAL_PRUNE_AND_JOIN("Logical Planning (with join and partition pruning)") { public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) { return PlannerPhase.mergedRuleSets( getDrillBasicRules(context), @@ -274,6 +274,7 @@ public enum PlannerPhase { */ DrillPushProjectPastFilterRule.INSTANCE, DrillPushProjectPastJoinRule.INSTANCE, + // Due to infinite loop in planning (DRILL-3257), temporarily disable this rule //DrillProjectSetOpTransposeRule.INSTANCE, RuleInstance.PROJECT_WINDOW_TRANSPOSE_RULE, @@ -342,8 +343,9 @@ public enum PlannerPhase { PruneScanRule.getDirFilterOnScan(optimizerRulesContext), ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext), ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext), - DrillPushLimitToScanRule.LIMIT_ON_SCAN, - DrillPushLimitToScanRule.LIMIT_ON_PROJECT + // Include LIMIT_ON_PROJECT since LIMIT_ON_SCAN may not work without it + DrillPushLimitToScanRule.LIMIT_ON_PROJECT, + DrillPushLimitToScanRule.LIMIT_ON_SCAN ) .build(); http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java index 91c33bd..d5c8d94 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java @@ -30,9 +30,11 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexVisitor; import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.validate.SqlValidatorUtil; import org.apache.calcite.util.Pair; @@ -224,4 +226,65 @@ public abstract class DrillRelOptUtil { } } + public static boolean isLimit0(RexNode fetch) { + if (fetch != null && fetch.isA(SqlKind.LITERAL)) { + RexLiteral l = (RexLiteral) fetch; + switch (l.getTypeName()) { + case BIGINT: + case INTEGER: + case DECIMAL: + if (((long) l.getValue2()) == 0) { + return true; + } + } + } + return false; + } + + /** + * Find whether the given project rel can produce non-scalar output (hence unknown rowcount). This + * would happen if the project has a flatten + * @param project : The project rel + * @return : Return true if the rowcount is unknown. Otherwise, false. + */ + public static boolean isProjectOutputRowcountUnknown(RelNode project) { + assert project instanceof Project : "Rel is NOT an instance of project!"; + for (RexNode rex : project.getChildExps()) { + if (rex instanceof RexCall) { + if ("flatten".equals(((RexCall) rex).getOperator().getName().toLowerCase())) { + return true; + } + } + } + return false; + } + + /** + * Find whether the given project rel has unknown output schema. This would happen if the + * project has CONVERT_FROMJSON which can only derive the schema after evaluation is performed + * @param project : The project rel + * @return : Return true if the project output schema is unknown. Otherwise, false. + */ + public static boolean isProjectOutputSchemaUnknown(RelNode project) { + assert project instanceof Project : "Rel is NOT an instance of project!"; + try { + RexVisitor<Void> visitor = + new RexVisitorImpl<Void>(true) { + public Void visitCall(RexCall call) { + if ("convert_fromjson".equals(call.getOperator().getName().toLowerCase())) { + throw new Util.FoundOne(call); /* throw exception to interrupt tree walk (this is similar to + other utility methods in RexUtil.java */ + } + return super.visitCall(call); + } + }; + for (RexNode rex : ((Project) project).getProjects()) { + rex.accept(visitor); + } + } catch (Util.FoundOne e) { + Util.swallow(e, null); + return true; + } + return false; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java index 9faf070..bef8b2b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java @@ -38,9 +38,13 @@ public class DrillLimitRel extends DrillLimitRelBase implements DrillRel { super(cluster, traitSet, child, offset, fetch); } + public DrillLimitRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch, boolean pushDown) { + super(cluster, traitSet, child, offset, fetch, pushDown); + } + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { - return new DrillLimitRel(getCluster(), traitSet, sole(inputs), offset, fetch); + return new DrillLimitRel(getCluster(), traitSet, sole(inputs), offset, fetch, isPushDown()); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java index 9c06897..068252d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java @@ -23,7 +23,14 @@ import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptRuleOperand; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexVisitor; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.util.Util; import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.common.DrillRelOptUtil; public abstract class DrillPushLimitToScanRule extends RelOptRule { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillPushLimitToScanRule.class); @@ -55,18 +62,21 @@ public abstract class DrillPushLimitToScanRule extends RelOptRule { } }; - public static DrillPushLimitToScanRule LIMIT_ON_PROJECT = - new DrillPushLimitToScanRule( - RelOptHelper.some(DrillLimitRel.class, RelOptHelper.some( - DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))), - "DrillPushLimitToScanRule_LimitOnProject") { + public static DrillPushLimitToScanRule LIMIT_ON_PROJECT = new DrillPushLimitToScanRule( + RelOptHelper.some(DrillLimitRel.class, RelOptHelper.any(DrillProjectRel.class)), "DrillPushLimitToScanRule_LimitOnProject") { @Override public boolean matches(RelOptRuleCall call) { DrillLimitRel limitRel = call.rel(0); - DrillScanRel scanRel = call.rel(2); - // For now only applies to Parquet. And pushdown only apply limit but not offset, + DrillProjectRel projectRel = call.rel(1); + // pushdown only apply limit but not offset, // so if getFetch() return null no need to run this rule. - if (scanRel.getGroupScan().supportsLimitPushdown() && (limitRel.getFetch() != null)) { + // Do not push across Project containing CONVERT_FROMJSON for limit 0 queries. For limit 0 queries, this would + // mess up the schema since Convert_FromJson() is different from other regular functions in that it only knows + // the output schema after evaluation is performed. When input has 0 row, Drill essentially does not have a way + // to know the output type. + if (!limitRel.isPushDown() && (limitRel.getFetch() != null) + && (!DrillRelOptUtil.isLimit0(limitRel.getFetch()) + || !DrillRelOptUtil.isProjectOutputSchemaUnknown(projectRel))) { return true; } return false; @@ -76,12 +86,20 @@ public abstract class DrillPushLimitToScanRule extends RelOptRule { public void onMatch(RelOptRuleCall call) { DrillLimitRel limitRel = call.rel(0); DrillProjectRel projectRel = call.rel(1); - DrillScanRel scanRel = call.rel(2); - doOnMatch(call, limitRel, scanRel, projectRel); + RelNode child = projectRel.getInput(); + final RelNode limitUnderProject = limitRel.copy(limitRel.getTraitSet(), ImmutableList.of(child)); + final RelNode newProject = projectRel.copy(projectRel.getTraitSet(), ImmutableList.of(limitUnderProject)); + if (DrillRelOptUtil.isProjectOutputRowcountUnknown(projectRel)) { + //Preserve limit above the project since Flatten can produce more rows. Also mark it so we do not fire the rule again. + final RelNode limitAboveProject = new DrillLimitRel(limitRel.getCluster(), limitRel.getTraitSet(), newProject, + limitRel.getOffset(), limitRel.getFetch(), true); + call.transformTo(limitAboveProject); + } else { + call.transformTo(newProject); + } } }; - protected void doOnMatch(RelOptRuleCall call, DrillLimitRel limitRel, DrillScanRel scanRel, DrillProjectRel projectRel){ try { final int rowCountRequested = (int) limitRel.getRows(); @@ -113,6 +131,5 @@ public abstract class DrillPushLimitToScanRule extends RelOptRule { } catch (Exception e) { logger.warn("Exception while using the pruned partitions.", e); } - } } http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java index 166c350..03d5f75 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java @@ -41,6 +41,7 @@ import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.planner.common.DrillRelOptUtil; import org.apache.drill.exec.planner.logical.DrillDirectScanRel; import org.apache.drill.exec.planner.logical.DrillLimitRel; import org.apache.drill.exec.planner.logical.DrillRel; @@ -135,24 +136,9 @@ public class FindLimit0Visitor extends RelShuttleImpl { return contains; } - private static boolean isLimit0(RexNode fetch) { - if (fetch != null && fetch.isA(SqlKind.LITERAL)) { - RexLiteral l = (RexLiteral) fetch; - switch (l.getTypeName()) { - case BIGINT: - case INTEGER: - case DECIMAL: - if (((long) l.getValue2()) == 0) { - return true; - } - } - } - return false; - } - @Override public RelNode visit(LogicalSort sort) { - if (isLimit0(sort.fetch)) { + if (DrillRelOptUtil.isLimit0(sort.fetch)) { contains = true; return sort; } @@ -163,7 +149,7 @@ public class FindLimit0Visitor extends RelShuttleImpl { @Override public RelNode visit(RelNode other) { if (other instanceof DrillLimitRel) { - if (isLimit0(((DrillLimitRel) other).getFetch())) { + if (DrillRelOptUtil.isLimit0(((DrillLimitRel) other).getFetch())) { contains = true; return other; } http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java index 1a5117f..0a28d69 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java @@ -63,5 +63,12 @@ public class TestFlattenPlanning extends PlanTestBase { PlanTestBase.testPlanMatchingPatterns(query, expectedPlans, excludedPlans); } + @Test // DRILL-6099 : push limit past flatten(project) + public void testLimitPushdownPastFlatten() throws Exception { + final String query = "select rownum, flatten(complex) comp from cp.`store/json/test_flatten_mappify2.json` limit 1"; + final String[] expectedPatterns = {".*Limit\\(fetch=\\[1\\]\\).*",".*Flatten.*",".*Limit\\(fetch=\\[1\\]\\).*"}; + final String[] excludedPatterns = null; + PlanTestBase.testPlanMatchingPatterns(query, expectedPatterns, excludedPatterns); + } }
