This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit aa6a898a596ce9a8e2677ebc4987947bffc56dcb Author: Sudheesh Katkam <[email protected]> AuthorDate: Mon Mar 21 15:32:47 2016 -0700 DRILL-6574: Add option to push LIMIT(0) on top of SCAN (late limit 0 optimization) --- .../java/org/apache/drill/exec/ExecConstants.java | 3 + .../planner/sql/handlers/DefaultSqlHandler.java | 3 + .../planner/sql/handlers/FindLimit0Visitor.java | 119 +++++++++++++++++++++ .../exec/server/options/SystemOptionManager.java | 1 + .../java-exec/src/main/resources/drill-module.conf | 3 +- .../java/org/apache/drill/TestPartitionFilter.java | 4 +- .../impl/limit/TestLateLimit0Optimization.java | 112 +++++++++++++++++++ 7 files changed, 243 insertions(+), 2 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index d0842d2..282ad30 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -453,6 +453,9 @@ public final class ExecConstants { public static final String EARLY_LIMIT0_OPT_KEY = "planner.enable_limit0_optimization"; public static final BooleanValidator EARLY_LIMIT0_OPT = new BooleanValidator(EARLY_LIMIT0_OPT_KEY); + public static final String LATE_LIMIT0_OPT_KEY = "planner.enable_limit0_on_scan"; + public static final BooleanValidator LATE_LIMIT0_OPT = new BooleanValidator(LATE_LIMIT0_OPT_KEY); + public static final String ENABLE_MEMORY_ESTIMATION_KEY = "planner.memory.enable_memory_estimation"; public static final OptionValidator ENABLE_MEMORY_ESTIMATION = new BooleanValidator(ENABLE_MEMORY_ESTIMATION_KEY); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 83e1a8f..cc2ec60 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -287,6 +287,9 @@ public class DefaultSqlHandler extends AbstractSqlHandler { if (FindLimit0Visitor.containsLimit0(convertedRelNodeWithSum0) && FindHardDistributionScans.canForceSingleMode(convertedRelNodeWithSum0)) { context.getPlannerSettings().forceSingleMode(); + if (context.getOptions().getOption(ExecConstants.LATE_LIMIT0_OPT)) { + return FindLimit0Visitor.addLimitOnTopOfLeafNodes(drillRel); + } } return drillRel; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java index 8031609..3746d7e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java @@ -22,14 +22,23 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelShuttle; import org.apache.calcite.rel.RelShuttleImpl; +import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalIntersect; import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalMinus; import org.apache.calcite.rel.logical.LogicalSort; import org.apache.calcite.rel.logical.LogicalUnion; +import org.apache.calcite.rel.logical.LogicalValues; import org.apache.calcite.rel.type.RelDataTypeField; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.types.TypeProtos; @@ -42,6 +51,8 @@ import org.apache.drill.exec.planner.common.DrillRelOptUtil; import org.apache.drill.exec.planner.logical.DrillDirectScanRel; import org.apache.drill.exec.planner.logical.DrillLimitRel; import org.apache.drill.exec.planner.logical.DrillRel; +import org.apache.drill.exec.planner.common.DrillProjectRelBase; +import org.apache.drill.exec.planner.sql.DrillSqlOperator; import org.apache.drill.exec.planner.sql.TypeInferenceUtils; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.AbstractRecordReader; @@ -49,6 +60,14 @@ import org.apache.drill.exec.store.direct.DirectGroupScan; import java.util.ArrayList; import java.util.List; +import org.apache.calcite.rex.RexBuilder; +import org.apache.drill.exec.planner.common.DrillAggregateRelBase; +import org.apache.drill.exec.planner.common.DrillJoinRelBase; +import org.apache.drill.exec.planner.common.DrillUnionRelBase; +import org.apache.drill.exec.util.Pointer; + +import java.math.BigDecimal; +import java.util.Set; /** * Visitor that will identify whether the root portion of the RelNode tree contains a limit 0 pattern. In this case, we @@ -128,6 +147,100 @@ public class FindLimit0Visitor extends RelShuttleImpl { private boolean contains = false; + private static final Set<String> unsupportedFunctions = ImmutableSet.<String>builder() + // see Mappify + .add("KVGEN") + .add("MAPPIFY") + // see DummyFlatten + .add("FLATTEN") + // see JsonConvertFrom + .add("CONVERT_FROMJSON") + // see JsonConvertTo class + .add("CONVERT_TOJSON") + .add("CONVERT_TOSIMPLEJSON") + .add("CONVERT_TOEXTENDEDJSON") + .build(); + + private static boolean isUnsupportedScalarFunction(final SqlOperator operator) { + return operator instanceof DrillSqlOperator && + unsupportedFunctions.contains(operator.getName().toUpperCase()); + } + + /** + * TODO(DRILL-3993): Use RelBuilder to create a limit node to allow for applying this optimization in potentially + * any of the transformations, but currently this can be applied after Drill logical transformation, and before + * Drill physical transformation. + */ + public static DrillRel addLimitOnTopOfLeafNodes(final DrillRel rel) { + final Pointer<Boolean> isUnsupported = new Pointer<>(false); + + // to visit unsupported functions + final RexShuttle unsupportedFunctionsVisitor = new RexShuttle() { + @Override + public RexNode visitCall(RexCall call) { + final SqlOperator operator = call.getOperator(); + if (isUnsupportedScalarFunction(operator)) { + isUnsupported.value = true; + return call; + } + return super.visitCall(call); + } + }; + + // to visit unsupported operators + final RelShuttle unsupportedOperationsVisitor = new RelShuttleImpl() { + @Override + public RelNode visit(RelNode other) { + if (other instanceof DrillUnionRelBase) { + isUnsupported.value = true; + return other; + } else if (other instanceof DrillProjectRelBase) { + other.accept(unsupportedFunctionsVisitor); + if (isUnsupported.value) { + return other; + } + } + return super.visit(other); + } + }; + + rel.accept(unsupportedOperationsVisitor); + if (isUnsupported.value) { + return rel; + } + + // to add LIMIT (0) on top of leaf nodes + final RelShuttle addLimitOnScanVisitor = new RelShuttleImpl() { + + private RelNode addLimitAsParent(RelNode node) { + final RexBuilder builder = node.getCluster().getRexBuilder(); + final RexLiteral offset = builder.makeExactLiteral(BigDecimal.ZERO); + final RexLiteral fetch = builder.makeExactLiteral(BigDecimal.ZERO); + return new DrillLimitRel(node.getCluster(), node.getTraitSet(), node, offset, fetch); + } + + @Override + public RelNode visit(LogicalValues values) { + return addLimitAsParent(values); + } + + @Override + public RelNode visit(TableScan scan) { + return addLimitAsParent(scan); + } + + @Override + public RelNode visit(RelNode other) { + if (other.getInputs().size() == 0) { // leaf operator + return addLimitAsParent(other); + } + return super.visit(other); + } + }; + + return (DrillRel) rel.accept(addLimitOnScanVisitor); + } + private FindLimit0Visitor() { } @@ -147,6 +260,11 @@ public class FindLimit0Visitor extends RelShuttleImpl { @Override public RelNode visit(RelNode other) { + if (other instanceof DrillJoinRelBase || + other instanceof DrillAggregateRelBase || + other instanceof DrillUnionRelBase) { + return other; + } if (other instanceof DrillLimitRel) { if (DrillRelOptUtil.isLimit0(((DrillLimitRel) other).getFetch())) { contains = true; @@ -157,6 +275,7 @@ public class FindLimit0Visitor extends RelShuttleImpl { return super.visit(other); } + // TODO: The following nodes are never visited because this visitor is used after logical transformation! // The following set of RelNodes should terminate a search for the limit 0 pattern as they want convey its meaning. @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 5ee3825..a627821 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -189,6 +189,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.MIN_HASH_TABLE_SIZE), new OptionDefinition(ExecConstants.MAX_HASH_TABLE_SIZE), new OptionDefinition(ExecConstants.EARLY_LIMIT0_OPT), + new OptionDefinition(ExecConstants.LATE_LIMIT0_OPT), new OptionDefinition(ExecConstants.ENABLE_MEMORY_ESTIMATION), new OptionDefinition(ExecConstants.MAX_QUERY_MEMORY_PER_NODE), new OptionDefinition(ExecConstants.PERCENT_MEMORY_PER_QUERY), diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 19e779d..16a285b 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -508,7 +508,8 @@ drill.exec.options: { planner.enable_hep_opt: true, planner.enable_hep_partition_pruning: true, planner.enable_join_optimization: true, - planner.enable_limit0_optimization: false, + planner.enable_limit0_optimization: true, + planner.enable_limit0_on_scan: true, planner.enable_mergejoin: true, planner.enable_multiphase_agg: true, planner.enable_mux_exchange: true, diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java index febabfe..8b001f6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java @@ -280,7 +280,9 @@ public class TestPartitionFilter extends PlanTestBase { public void testMainQueryFilterRegularColumn() throws Exception { String query = "select * from (select dir0, o_custkey from dfs.`multilevel/parquet` where dir0='1994' and o_custkey = 10) t limit 0"; // with Parquet RG filter pushdown, reduce to 1 file ( o_custkey all > 10). - testIncludeFilter(query, 1, "Filter\\(", 0); + // There is a LIMIT(0) inserted on top of SCAN, so filter push down is not applied. + // Since this is a LIMIT 0 query, not pushing down the filter should not cause a perf. regression. + testIncludeFilter(query, 4, "Filter", 0); } @Test // see DRILL-2852 and DRILL-3591 diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLateLimit0Optimization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLateLimit0Optimization.java new file mode 100644 index 0000000..f819260 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLateLimit0Optimization.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.limit; + +import org.apache.drill.test.BaseTestQuery; +import org.apache.drill.PlanTestBase; +import org.junit.Test; + +public class TestLateLimit0Optimization extends BaseTestQuery { + + private static String wrapLimit0(final String query) { + return "SELECT * FROM (" + query + ") LZT LIMIT 0"; + } + + private static void checkThatQueryIsOptimized(final String query) throws Exception { + PlanTestBase.testPlanMatchingPatterns(wrapLimit0(query), + new String[]{ + ".*Limit\\(offset=\\[0\\], fetch=\\[0\\]\\)(.*[\n\r])+.*Scan.*" + }, new String[]{}); + } + + private static void checkThatQueryIsNotOptimized(final String query) throws Exception { + PlanTestBase.testPlanMatchingPatterns(wrapLimit0(query), + new String[]{}, + new String[]{ + ".*Limit\\(offset=\\[0\\], fetch=\\[0\\]\\)(.*[\n\r])+.*Scan.*" + }); + } + + @Test + public void convertFromJson() throws Exception { + checkThatQueryIsNotOptimized("SELECT CONVERT_FROM('{x:100, y:215.6}' ,'JSON') AS MYCOL FROM (VALUES(1))"); + } + + @Test + public void convertToIntBE() throws Exception { + checkThatQueryIsOptimized("SELECT CONVERT_TO(r_regionkey, 'INT_BE') FROM cp.`tpch/region.parquet`"); + } + + @Test + public void convertToOthers() throws Exception { + checkThatQueryIsOptimized("SELECT r_regionkey,\n" + + " STRING_BINARY(CONVERT_TO(r_regionkey, 'INT')) as i,\n" + + " STRING_BINARY(CONVERT_TO(r_regionkey, 'INT_BE')) as i_be,\n" + + " STRING_BINARY(CONVERT_TO(r_regionkey, 'BIGINT')) as l,\n" + + " STRING_BINARY(CONVERT_TO(r_regionkey, 'BIGINT')) as l_be,\n" + + " STRING_BINARY(CONVERT_TO(r_name, 'UTF8')) u8,\n" + + " STRING_BINARY(CONVERT_TO(r_name, 'UTF16')) u16,\n" + + " STRING_BINARY(CONVERT_TO(r_regionkey, 'INT_HADOOPV')) as l_be\n" + + "FROM cp.`tpch/region.parquet`"); + } + + @Test + public void union() throws Exception { + checkThatQueryIsNotOptimized("(select n_regionkey from cp.`tpch/nation.parquet`) union " + + "(select r_regionname from cp.`tpch/region.parquet`)"); + } + + @Test + public void unionAll() throws Exception { + checkThatQueryIsNotOptimized("(select n_regionkey from cp.`tpch/nation.parquet`) union all " + + "(select r_regionname from cp.`tpch/region.parquet`)"); + } + + @Test + public void flatten() throws Exception { + checkThatQueryIsNotOptimized("select flatten(arr) as a from cp.`/flatten/drill-3370.json`"); + } + + @Test + public void flatten2() throws Exception { + checkThatQueryIsNotOptimized("select uid, lst_lst, d.lst_lst[1], flatten(d.lst_lst) lst " + + "from cp.`tpch/region.parquet` d order by d.lst_lst[1][2]"); // table is just for validation + } + + @Test + public void flatten3() throws Exception { + checkThatQueryIsNotOptimized("select s.evnts.evnt_id from (select d.type type, flatten(d.events) evnts from " + + "cp.`tpch/region.parquet` d where d.type='web' order by d.uid) s " + + "where s.evnts.type = 'cmpgn4' and s.type='web'"); // table is just for validation + } + + @Test + public void flatten4() throws Exception { + checkThatQueryIsNotOptimized("select flatten(lst) from (select uid, flatten(d.lst_lst) lst from " + + "cp.`tpch/region.parquet` d) s1 order by s1.lst[3]"); // table is just for validation + } + + @Test + public void countDistinct() throws Exception { + checkThatQueryIsOptimized("SELECT COUNT(employee_id), " + + "SUM(employee_id), " + + "COUNT(DISTINCT employee_id) " + + "FROM cp.`employee.json`"); + } + +}
