vvysotskyi closed pull request #1386: DRILL-6574: Add option to push LIMIT(0) 
on top of SCAN (late limit 0 optimization)
URL: https://github.com/apache/drill/pull/1386
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index d0842d2996d..282ad30bd54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -453,6 +453,9 @@ private ExecConstants() {
   public static final String EARLY_LIMIT0_OPT_KEY = 
"planner.enable_limit0_optimization";
   public static final BooleanValidator EARLY_LIMIT0_OPT = new 
BooleanValidator(EARLY_LIMIT0_OPT_KEY);
 
+  public static final String LATE_LIMIT0_OPT_KEY = 
"planner.enable_limit0_on_scan";
+  public static final BooleanValidator LATE_LIMIT0_OPT = new 
BooleanValidator(LATE_LIMIT0_OPT_KEY);
+
   public static final String ENABLE_MEMORY_ESTIMATION_KEY = 
"planner.memory.enable_memory_estimation";
   public static final OptionValidator ENABLE_MEMORY_ESTIMATION = new 
BooleanValidator(ENABLE_MEMORY_ESTIMATION_KEY);
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 83e1a8f95e0..cc2ec609267 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -287,6 +287,9 @@ protected DrillRel convertToRawDrel(final RelNode relNode) 
throws SqlUnsupported
         if (FindLimit0Visitor.containsLimit0(convertedRelNodeWithSum0) &&
             
FindHardDistributionScans.canForceSingleMode(convertedRelNodeWithSum0)) {
           context.getPlannerSettings().forceSingleMode();
+          if (context.getOptions().getOption(ExecConstants.LATE_LIMIT0_OPT)) {
+            return FindLimit0Visitor.addLimitOnTopOfLeafNodes(drillRel);
+          }
         }
 
         return drillRel;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
index 8031609b898..8f44445e1eb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
@@ -19,17 +19,25 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttle;
 import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalIntersect;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalMinus;
 import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.rel.type.RelDataTypeField;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.types.TypeProtos;
@@ -42,6 +50,8 @@
 import org.apache.drill.exec.planner.logical.DrillDirectScanRel;
 import org.apache.drill.exec.planner.logical.DrillLimitRel;
 import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.common.DrillProjectRelBase;
+import org.apache.drill.exec.planner.sql.DrillSqlOperator;
 import org.apache.drill.exec.planner.sql.TypeInferenceUtils;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
@@ -49,6 +59,14 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.drill.exec.planner.common.DrillAggregateRelBase;
+import org.apache.drill.exec.planner.common.DrillJoinRelBase;
+import org.apache.drill.exec.planner.common.DrillUnionRelBase;
+import org.apache.drill.exec.util.Pointer;
+
+import java.math.BigDecimal;
+import java.util.Set;
 
 /**
  * Visitor that will identify whether the root portion of the RelNode tree 
contains a limit 0 pattern. In this case, we
@@ -56,7 +74,6 @@
  * executing a schema-only query.
  */
 public class FindLimit0Visitor extends RelShuttleImpl {
-//  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FindLimit0Visitor.class);
 
   // Some types are excluded in this set:
   // + VARBINARY is not fully tested.
@@ -76,6 +93,25 @@
               SqlTypeName.INTERVAL_MINUTE_SECOND, SqlTypeName.INTERVAL_SECOND, 
SqlTypeName.CHAR, SqlTypeName.DECIMAL)
           .build();
 
+  private static final Set<String> unsupportedFunctions = 
ImmutableSet.<String>builder()
+      // see Mappify
+      .add("KVGEN")
+      .add("MAPPIFY")
+      // see DummyFlatten
+      .add("FLATTEN")
+      // see JsonConvertFrom
+      .add("CONVERT_FROMJSON")
+      // see JsonConvertTo class
+      .add("CONVERT_TOJSON")
+      .add("CONVERT_TOSIMPLEJSON")
+      .add("CONVERT_TOEXTENDEDJSON")
+      .build();
+
+  private boolean contains = false;
+
+  private FindLimit0Visitor() {
+  }
+
   /**
    * If all field types of the given node are {@link #TYPES recognized types} 
and honored by execution, then this
    * method returns the tree: DrillDirectScanRel(field types). Otherwise, the 
method returns null.
@@ -85,8 +121,7 @@
    */
   public static DrillRel getDirectScanRelIfFullySchemaed(RelNode rel) {
     final List<RelDataTypeField> fieldList = rel.getRowType().getFieldList();
-    final List<TypeProtos.MajorType> columnTypes = Lists.newArrayList();
-
+    final List<TypeProtos.MajorType> columnTypes = new ArrayList<>();
 
     for (final RelDataTypeField field : fieldList) {
       final SqlTypeName sqlTypeName = field.getType().getSqlTypeName();
@@ -126,9 +161,81 @@ public static boolean containsLimit0(final RelNode rel) {
     return visitor.isContains();
   }
 
-  private boolean contains = false;
+  public static DrillRel addLimitOnTopOfLeafNodes(final DrillRel rel) {
+    final Pointer<Boolean> isUnsupported = new Pointer<>(false);
+
+    // to visit unsupported functions
+    final RexShuttle unsupportedFunctionsVisitor = new RexShuttle() {
+      @Override
+      public RexNode visitCall(RexCall call) {
+        final SqlOperator operator = call.getOperator();
+        if (isUnsupportedScalarFunction(operator)) {
+          isUnsupported.value = true;
+          return call;
+        }
+        return super.visitCall(call);
+      }
+    };
+
+    // to visit unsupported operators
+    final RelShuttle unsupportedOperationsVisitor = new RelShuttleImpl() {
+      @Override
+      public RelNode visit(RelNode other) {
+        if (other instanceof DrillUnionRelBase) {
+          isUnsupported.value = true;
+          return other;
+        } else if (other instanceof DrillProjectRelBase) {
+          if (!isUnsupported.value) {
+            other.accept(unsupportedFunctionsVisitor);
+          }
+          if (isUnsupported.value) {
+            return other;
+          }
+        }
+        return super.visit(other);
+      }
+    };
 
-  private FindLimit0Visitor() {
+    rel.accept(unsupportedOperationsVisitor);
+    if (isUnsupported.value) {
+      return rel;
+    }
+
+    // to add LIMIT (0) on top of leaf nodes
+    final RelShuttle addLimitOnScanVisitor = new RelShuttleImpl() {
+
+      private RelNode addLimitAsParent(RelNode node) {
+        final RexBuilder builder = node.getCluster().getRexBuilder();
+        final RexLiteral offset = builder.makeExactLiteral(BigDecimal.ZERO);
+        final RexLiteral fetch = builder.makeExactLiteral(BigDecimal.ZERO);
+        return new DrillLimitRel(node.getCluster(), node.getTraitSet(), node, 
offset, fetch);
+      }
+
+      @Override
+      public RelNode visit(LogicalValues values) {
+        return addLimitAsParent(values);
+      }
+
+      @Override
+      public RelNode visit(TableScan scan) {
+        return addLimitAsParent(scan);
+      }
+
+      @Override
+      public RelNode visit(RelNode other) {
+        if (other.getInputs().isEmpty()) { // leaf operator
+          return addLimitAsParent(other);
+        }
+        return super.visit(other);
+      }
+    };
+
+    return (DrillRel) rel.accept(addLimitOnScanVisitor);
+  }
+
+  private static boolean isUnsupportedScalarFunction(final SqlOperator 
operator) {
+    return operator instanceof DrillSqlOperator &&
+        unsupportedFunctions.contains(operator.getName().toUpperCase());
   }
 
   boolean isContains() {
@@ -147,6 +254,11 @@ public RelNode visit(LogicalSort sort) {
 
   @Override
   public RelNode visit(RelNode other) {
+    if (other instanceof DrillJoinRelBase ||
+        other instanceof DrillAggregateRelBase ||
+        other instanceof DrillUnionRelBase) {
+      return other;
+    }
     if (other instanceof DrillLimitRel) {
       if (DrillRelOptUtil.isLimit0(((DrillLimitRel) other).getFetch())) {
         contains = true;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 5ee3825bd28..a627821dd37 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -189,6 +189,7 @@
       new OptionDefinition(ExecConstants.MIN_HASH_TABLE_SIZE),
       new OptionDefinition(ExecConstants.MAX_HASH_TABLE_SIZE),
       new OptionDefinition(ExecConstants.EARLY_LIMIT0_OPT),
+      new OptionDefinition(ExecConstants.LATE_LIMIT0_OPT),
       new OptionDefinition(ExecConstants.ENABLE_MEMORY_ESTIMATION),
       new OptionDefinition(ExecConstants.MAX_QUERY_MEMORY_PER_NODE),
       new OptionDefinition(ExecConstants.PERCENT_MEMORY_PER_QUERY),
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index 19e779d16dc..16a285beac2 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -508,7 +508,8 @@ drill.exec.options: {
     planner.enable_hep_opt: true,
     planner.enable_hep_partition_pruning: true,
     planner.enable_join_optimization: true,
-    planner.enable_limit0_optimization: false,
+    planner.enable_limit0_optimization: true,
+    planner.enable_limit0_on_scan: true,
     planner.enable_mergejoin: true,
     planner.enable_multiphase_agg: true,
     planner.enable_mux_exchange: true,
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java 
b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
index febabfef205..e00e5dc60f3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
@@ -280,6 +280,8 @@ public void testMainQueryTrueCondition() throws Exception {
   public void testMainQueryFilterRegularColumn() throws Exception {
     String query = "select * from (select dir0, o_custkey from 
dfs.`multilevel/parquet` where dir0='1994' and o_custkey = 10) t limit 0";
     // with Parquet RG filter pushdown, reduce to 1 file ( o_custkey all > 10).
+    // There is a LIMIT(0) inserted on top of SCAN, so filter push down is not 
applied.
+    // Since this is a LIMIT 0 query, not pushing down the filter should not 
cause a perf. regression.
     testIncludeFilter(query, 1, "Filter\\(", 0);
   }
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLateLimit0Optimization.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLateLimit0Optimization.java
new file mode 100644
index 00000000000..94fcab59d80
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLateLimit0Optimization.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.PlanTestBase;
+import org.junit.Test;
+
+import static org.apache.drill.exec.ExecConstants.LATE_LIMIT0_OPT_KEY;
+
+public class TestLateLimit0Optimization extends BaseTestQuery {
+
+  @Test
+  public void convertFromJson() throws Exception {
+    checkThatQueryIsNotOptimized("SELECT CONVERT_FROM('{x:100, y:215.6}' 
,'JSON') AS MYCOL FROM (VALUES(1))");
+  }
+
+  private static void checkThatQueryIsNotOptimized(final String query) throws 
Exception {
+    PlanTestBase.testPlanMatchingPatterns(wrapLimit0(query),
+        null,
+        new String[] {".*Limit\\(offset=\\[0\\], 
fetch=\\[0\\]\\)(.*[\n\r])+.*Scan.*"});
+  }
+
+  private static String wrapLimit0(final String query) {
+    return "SELECT * FROM (" + query + ") LZT LIMIT 0";
+  }
+
+  @Test
+  public void convertToIntBE() throws Exception {
+    checkThatQueryIsOptimized("SELECT CONVERT_TO(r_regionkey, 'INT_BE') FROM 
cp.`tpch/region.parquet`");
+  }
+
+  private static void checkThatQueryIsOptimized(final String query) throws 
Exception {
+    PlanTestBase.testPlanMatchingPatterns(wrapLimit0(query),
+        new String[] {".*Limit\\(offset=\\[0\\], 
fetch=\\[0\\]\\)(.*[\n\r])+.*Scan.*"});
+  }
+
+  @Test
+  public void convertToOthers() throws Exception {
+    checkThatQueryIsOptimized("SELECT r_regionkey,\n" +
+        "  STRING_BINARY(CONVERT_TO(r_regionkey, 'INT')) as i,\n" +
+        "  STRING_BINARY(CONVERT_TO(r_regionkey, 'INT_BE')) as i_be,\n" +
+        "  STRING_BINARY(CONVERT_TO(r_regionkey, 'BIGINT')) as l,\n" +
+        "  STRING_BINARY(CONVERT_TO(r_regionkey, 'BIGINT')) as l_be,\n" +
+        "  STRING_BINARY(CONVERT_TO(r_name, 'UTF8')) u8,\n" +
+        "  STRING_BINARY(CONVERT_TO(r_name, 'UTF16')) u16,\n" +
+        "  STRING_BINARY(CONVERT_TO(r_regionkey, 'INT_HADOOPV')) as l_be\n" +
+        "FROM cp.`tpch/region.parquet`");
+  }
+
+  @Test
+  public void union() throws Exception {
+    checkThatQueryIsNotOptimized("(select n_regionkey from 
cp.`tpch/nation.parquet`) union " +
+        "(select r_regionname from cp.`tpch/region.parquet`)");
+  }
+
+  @Test
+  public void unionAll() throws Exception {
+    checkThatQueryIsNotOptimized("(select n_regionkey from 
cp.`tpch/nation.parquet`) union all " +
+        "(select r_regionname from cp.`tpch/region.parquet`)");
+  }
+
+  @Test
+  public void flatten() throws Exception {
+    checkThatQueryIsNotOptimized("select flatten(arr) as a from 
cp.`/flatten/drill-3370.json`");
+  }
+
+  @Test
+  public void flatten2() throws Exception {
+    checkThatQueryIsNotOptimized("select uid, lst_lst, d.lst_lst[1], 
flatten(d.lst_lst) lst " +
+        "from cp.`tpch/region.parquet` d order by d.lst_lst[1][2]"); // table 
is just for validation
+  }
+
+  @Test
+  public void flatten3() throws Exception {
+    checkThatQueryIsNotOptimized("select s.evnts.evnt_id from (select d.type 
type, flatten(d.events) evnts from " +
+        "cp.`tpch/region.parquet` d where d.type='web' order by d.uid) s " +
+        "where s.evnts.type = 'cmpgn4' and s.type='web'"); // table is just 
for validation
+  }
+
+  @Test
+  public void flatten4() throws Exception {
+    checkThatQueryIsNotOptimized("select flatten(lst) from (select uid, 
flatten(d.lst_lst) lst from " +
+        "cp.`tpch/region.parquet` d) s1 order by s1.lst[3]"); // table is just 
for validation
+  }
+
+  @Test
+  public void countDistinct() throws Exception {
+    checkThatQueryIsOptimized("SELECT COUNT(employee_id), " +
+            "SUM(employee_id), " +
+            "COUNT(DISTINCT employee_id) " +
+            "FROM cp.`employee.json`");
+  }
+
+  @Test
+  public void testLimit0IsAbsentWhenDisabled() throws Exception {
+    String query = "SELECT CONVERT_TO(r_regionkey, 'INT_BE') FROM 
cp.`tpch/region.parquet`";
+    try {
+      setSessionOption(LATE_LIMIT0_OPT_KEY, false);
+      PlanTestBase.testPlanMatchingPatterns(wrapLimit0(query), null, new 
String[] {".*Limit\\(offset=\\[0\\], fetch=\\[0\\]\\)(.*[\n\r])+.*Scan.*"});
+    } finally {
+      resetSessionOption(LATE_LIMIT0_OPT_KEY);
+    }
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to