This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fe31c3  DRILL-6374: Transitive Closure leads to TPCH Queries 
regressions and OOM when run concurrency test
1fe31c3 is described below

commit 1fe31c34ebf98bd913a91a8223b6f6b29c80f4e8
Author: Vitalii Diravka <[email protected]>
AuthorDate: Mon May 14 15:27:16 2018 +0300

    DRILL-6374: Transitive Closure leads to TPCH Queries regressions and OOM 
when run concurrency test
    
    - Test case for Directory Pruning with Transitive Predicates
    - Improving description for STRICT_EQUAL_IS_DISTINCT_FROM predicate
    
    closes #1262
---
 .../apache/drill/exec/planner/PlannerPhase.java    | 17 ++++---
 .../apache/drill/exec/planner/RuleInstance.java    | 12 ++++-
 .../exec/planner/logical/DrillFilterJoinRules.java | 55 +++++++++++++++-------
 .../logical/DrillPushFilterPastProjectRule.java    | 31 +++++++-----
 .../planner/sql/handlers/DefaultSqlHandler.java    | 23 +++++----
 .../drill/exec/store/AbstractStoragePlugin.java    |  4 +-
 .../java/org/apache/drill/TestPartitionFilter.java | 17 +++++++
 ...quetFilterPushdownWithTransitivePredicates.java |  5 +-
 8 files changed, 110 insertions(+), 54 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index b5e09ef..2a79751 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -265,8 +265,8 @@ public enum PlannerPhase {
       RuleInstance.UNION_TO_DISTINCT_RULE,
 
       // Add support for WHERE style joins.
-      DrillFilterJoinRules.DRILL_FILTER_ON_JOIN,
-      DrillFilterJoinRules.DRILL_JOIN,
+      DrillFilterJoinRules.FILTER_INTO_JOIN,
+      DrillFilterJoinRules.JOIN_PUSH_CONDITION,
       RuleInstance.JOIN_PUSH_EXPRESSIONS_RULE,
       // End support for WHERE style joins.
 
@@ -532,19 +532,18 @@ public enum PlannerPhase {
 
   /**
    * RuleSet for join transitive closure, used only in HepPlanner.<p>
-   * TODO: {@link RuleInstance#JOIN_PUSH_TRANSITIVE_PREDICATES_RULE} should be 
copied to #staticRuleSet,
-   * once CALCITE-1048 is solved. This still should be present in {@link 
#TRANSITIVE_CLOSURE} stage
-   * for applying additional filters before {@link #DIRECTORY_PRUNING}.
+   * TODO: {@link RuleInstance#DRILL_JOIN_PUSH_TRANSITIVE_PREDICATES_RULE} 
should be moved into {@link #staticRuleSet},
+   * (with using {@link DrillRelFactories#LOGICAL_BUILDER}) once CALCITE-1048 
is solved. This block can be removed then.
    *
    * @return set of planning rules
    */
   static RuleSet getJoinTransitiveClosureRules() {
     return RuleSets.ofList(ImmutableSet.<RelOptRule> builder()
         .add(
-            DrillFilterJoinRules.DRILL_FILTER_ON_JOIN,
-            DrillFilterJoinRules.DRILL_JOIN,
-            RuleInstance.JOIN_PUSH_TRANSITIVE_PREDICATES_RULE,
-            RuleInstance.FILTER_MERGE_RULE
+            RuleInstance.DRILL_JOIN_PUSH_TRANSITIVE_PREDICATES_RULE,
+            DrillFilterJoinRules.DRILL_FILTER_INTO_JOIN,
+            RuleInstance.REMOVE_IS_NOT_DISTINCT_FROM_RULE,
+            RuleInstance.DRILL_FILTER_MERGE_RULE
         ).build());
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
index 986fdbd..80bbe88 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
@@ -29,6 +29,7 @@ import 
org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule;
 import org.apache.calcite.rel.rules.AggregateRemoveRule;
 import org.apache.calcite.rel.rules.FilterCorrelateRule;
 import org.apache.calcite.rel.rules.FilterMergeRule;
+import org.apache.calcite.rel.rules.FilterRemoveIsNotDistinctFromRule;
 import org.apache.calcite.rel.rules.FilterSetOpTransposeRule;
 import org.apache.calcite.rel.rules.JoinPushExpressionsRule;
 import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
@@ -63,6 +64,9 @@ public interface RuleInstance {
   FilterMergeRule FILTER_MERGE_RULE =
       new FilterMergeRule(DrillRelFactories.LOGICAL_BUILDER);
 
+  FilterMergeRule DRILL_FILTER_MERGE_RULE =
+      new 
FilterMergeRule(DrillRelBuilder.proto(DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY));
+
   FilterCorrelateRule FILTER_CORRELATE_RULE =
       new FilterCorrelateRule(DrillRelFactories.LOGICAL_BUILDER);
 
@@ -120,6 +124,10 @@ public interface RuleInstance {
    * {@link org.apache.calcite.rel.core.Filter}s if those predicates can be 
pushed
    * to its inputs.
    */
-  JoinPushTransitivePredicatesRule JOIN_PUSH_TRANSITIVE_PREDICATES_RULE =
-      new JoinPushTransitivePredicatesRule(Join.class, 
DrillRelFactories.LOGICAL_BUILDER);
+  JoinPushTransitivePredicatesRule DRILL_JOIN_PUSH_TRANSITIVE_PREDICATES_RULE =
+      new JoinPushTransitivePredicatesRule(Join.class, DrillRelBuilder.proto(
+          DrillRelFactories.DRILL_LOGICAL_JOIN_FACTORY, 
DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY));
+
+  FilterRemoveIsNotDistinctFromRule REMOVE_IS_NOT_DISTINCT_FROM_RULE =
+      new 
FilterRemoveIsNotDistinctFromRule(DrillRelBuilder.proto(DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY));
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterJoinRules.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterJoinRules.java
index 6d186e1..5e24103 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterJoinRules.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterJoinRules.java
@@ -17,16 +17,15 @@
  */
 package org.apache.drill.exec.planner.logical;
 
-import com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.rules.FilterJoinRule;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexNode;
 import org.apache.drill.exec.planner.DrillRelBuilder;
 
+import java.util.ArrayList;
 import java.util.List;
 
 public class DrillFilterJoinRules {
@@ -47,10 +46,10 @@ public class DrillFilterJoinRules {
                           // into LEFT/RIGHT.
           }
 
-          List<RexNode> tmpLeftKeys = Lists.newArrayList();
-          List<RexNode> tmpRightKeys = Lists.newArrayList();
-          List<RelDataTypeField> sysFields = Lists.newArrayList();
-          List<Integer> filterNulls = Lists.newArrayList();
+          List<RexNode> tmpLeftKeys = new ArrayList<>();
+          List<RexNode> tmpRightKeys = new ArrayList<>();
+          List<RelDataTypeField> sysFields = new ArrayList<>();
+          List<Integer> filterNulls = new ArrayList<>();
 
           RexNode remaining = RelOptUtil.splitJoinCondition(sysFields, 
join.getLeft(), join.getRight(),
               exp, tmpLeftKeys, tmpRightKeys, filterNulls, null);
@@ -59,20 +58,44 @@ public class DrillFilterJoinRules {
         }
       };
 
+  /** Predicate that always returns true for any filter in OUTER join, and 
only true
+   * for strict EQUAL or IS_DISTINCT_FROM conditions (without any mathematical 
operations) over RexInputRef in INNER join.
+   * With this predicate, the filter expression that return true will be kept 
in the JOIN OP.
+   * Example:  INNER JOIN,   L.C1 = R.C2 will be kepted in JOIN.
+   *                         L.C3 + 100 = R.C4 + 100, L.C5 < R.C6 will be 
pulled up into Filter above JOIN.
+   *           OUTER JOIN,   Keep any filter in JOIN.
+  */
+  public static final FilterJoinRule.Predicate STRICT_EQUAL_IS_DISTINCT_FROM =
+      new FilterJoinRule.Predicate() {
+        public boolean apply(Join join, JoinRelType joinType, RexNode exp) {
+          if (joinType != JoinRelType.INNER) {
+            return true;
+          }
+
+          List<Integer> tmpLeftKeys = new ArrayList<>();
+          List<Integer> tmpRightKeys = new ArrayList<>();
+          List<Boolean> filterNulls = new ArrayList<>();
+
+          RexNode remaining =
+              RelOptUtil.splitJoinCondition(join.getLeft(), join.getRight(), 
exp, tmpLeftKeys, tmpRightKeys, filterNulls);
+
+          return remaining.isAlwaysTrue();
+        }
+      };
+
 
   /** Rule that pushes predicates from a Filter into the Join below them. */
-  public static final FilterJoinRule DRILL_FILTER_ON_JOIN =
-      new FilterJoinRule.FilterIntoJoinRule(true,
-          DrillRelBuilder.proto(RelFactories.DEFAULT_FILTER_FACTORY,
-              RelFactories.DEFAULT_PROJECT_FACTORY),
-          EQUAL_IS_DISTINCT_FROM);
+  public static final FilterJoinRule FILTER_INTO_JOIN =
+      new FilterJoinRule.FilterIntoJoinRule(true, 
DrillRelFactories.LOGICAL_BUILDER, EQUAL_IS_DISTINCT_FROM);
 
+  /** The same as above, but with Drill's operators. */
+  public static final FilterJoinRule DRILL_FILTER_INTO_JOIN =
+      new FilterJoinRule.FilterIntoJoinRule(true,
+          
DrillRelBuilder.proto(DrillRelFactories.DRILL_LOGICAL_PROJECT_FACTORY,
+              DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY), 
STRICT_EQUAL_IS_DISTINCT_FROM);
 
   /** Rule that pushes predicates in a Join into the inputs to the join. */
-  public static final FilterJoinRule DRILL_JOIN =
-      new FilterJoinRule.JoinConditionPushRule(
-          DrillRelBuilder.proto(RelFactories.DEFAULT_FILTER_FACTORY,
-              RelFactories.DEFAULT_PROJECT_FACTORY),
-          EQUAL_IS_DISTINCT_FROM);
+  public static final FilterJoinRule JOIN_PUSH_CONDITION =
+      new 
FilterJoinRule.JoinConditionPushRule(DrillRelFactories.LOGICAL_BUILDER, 
EQUAL_IS_DISTINCT_FROM);
 
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
index c7f2838..2a2a09f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
@@ -21,12 +21,15 @@ import com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.calcite.util.Pair;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 
@@ -36,7 +39,7 @@ import java.util.List;
 
 public class DrillPushFilterPastProjectRule extends RelOptRule {
 
-  public final static RelOptRule INSTANCE = new 
DrillPushFilterPastProjectRule();
+  public final static RelOptRule INSTANCE = new 
DrillPushFilterPastProjectRule(DrillRelFactories.LOGICAL_BUILDER);
 
   private static final Collection<String> BANNED_OPERATORS;
 
@@ -46,12 +49,8 @@ public class DrillPushFilterPastProjectRule extends 
RelOptRule {
     BANNED_OPERATORS.add("item");
   }
 
-  private DrillPushFilterPastProjectRule() {
-    super(
-        operand(
-            LogicalFilter.class,
-            operand(LogicalProject.class, any())),
-        DrillRelFactories.LOGICAL_BUILDER, null);
+  private DrillPushFilterPastProjectRule(RelBuilderFactory relBuilderFactory) {
+    super(operand(LogicalFilter.class, operand(LogicalProject.class, any())), 
relBuilderFactory,null);
   }
 
   //~ Methods ----------------------------------------------------------------
@@ -60,6 +59,7 @@ public class DrillPushFilterPastProjectRule extends 
RelOptRule {
   public void onMatch(RelOptRuleCall call) {
     Filter filterRel = call.rel(0);
     Project projRel = call.rel(1);
+    RelBuilder builder = call.builder();
 
     // get a conjunctions of the filter condition. For each conjunction, if it 
refers to ITEM or FLATTEN expression
     // then we could not pushed down. Otherwise, it's qualified to be pushed 
down.
@@ -87,11 +87,14 @@ public class DrillPushFilterPastProjectRule extends 
RelOptRule {
     RexNode newCondition =
         RelOptUtil.pushPastProject(qualifedPred, projRel);
 
-    Filter newFilterRel = LogicalFilter.create(projRel.getInput(), 
newCondition);
+    RelNode newFilterRel =
+        builder
+            .push(projRel.getInput())
+            .filter(newCondition)
+            .build();
 
-    Project newProjRel =
-        (Project) relBuilderFactory
-            .create(newFilterRel.getCluster(), null)
+    RelNode newProjRel =
+        builder
             .push(newFilterRel)
             .projectNamed(Pair.left(projRel.getNamedProjects()), 
Pair.right(projRel.getNamedProjects()), true)
             .build();
@@ -108,7 +111,11 @@ public class DrillPushFilterPastProjectRule extends 
RelOptRule {
       //    Project
       //     \
       //      Filter  -- qualified filters
-      Filter filterNotPushed = LogicalFilter.create(newProjRel, 
unqualifiedPred);
+      RelNode filterNotPushed =
+          builder
+              .push(newProjRel)
+              .filter(unqualifiedPred)
+              .build();
       call.transformTo(filterNotPushed);
     }
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index e8dece6..98e017f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -237,12 +237,8 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
       // HEP for rules, which are failed at the LOGICAL_PLANNING stage for 
Volcano planner
       final RelNode setOpTransposeNode = transform(PlannerType.HEP, 
PlannerPhase.PRE_LOGICAL_PLANNING, relNode);
 
-      // HEP Join Push Transitive Predicates
-      final RelNode transitiveClosureNode =
-          transform(PlannerType.HEP, PlannerPhase.TRANSITIVE_CLOSURE, 
setOpTransposeNode);
-
-      // HEP Directory pruning .
-      final RelNode pruned = transform(PlannerType.HEP_BOTTOM_UP, 
PlannerPhase.DIRECTORY_PRUNING, transitiveClosureNode);
+      // HEP Directory pruning.
+      final RelNode pruned = transform(PlannerType.HEP_BOTTOM_UP, 
PlannerPhase.DIRECTORY_PRUNING, setOpTransposeNode);
       final RelTraitSet logicalTraits = 
pruned.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
 
       final RelNode convertedRelNode;
@@ -254,13 +250,22 @@ public class DefaultSqlHandler extends AbstractSqlHandler 
{
         final RelNode intermediateNode2;
         if (context.getPlannerSettings().isHepPartitionPruningEnabled()) {
 
-          // hep is enabled and hep pruning is enabled.
           final RelNode intermediateNode = transform(PlannerType.VOLCANO, 
PlannerPhase.LOGICAL, pruned, logicalTraits);
-          intermediateNode2 = transform(PlannerType.HEP_BOTTOM_UP, 
PlannerPhase.PARTITION_PRUNING, intermediateNode);
+
+          // HEP Join Push Transitive Predicates
+          final RelNode transitiveClosureNode =
+              transform(PlannerType.HEP, PlannerPhase.TRANSITIVE_CLOSURE, 
intermediateNode);
+
+          // hep is enabled and hep pruning is enabled.
+          intermediateNode2 = transform(PlannerType.HEP_BOTTOM_UP, 
PlannerPhase.PARTITION_PRUNING, transitiveClosureNode);
 
         } else {
           // Only hep is enabled
-          intermediateNode2 = transform(PlannerType.VOLCANO, 
PlannerPhase.LOGICAL_PRUNE, pruned, logicalTraits);
+          final RelNode intermediateNode =
+              transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL_PRUNE, 
pruned, logicalTraits);
+
+          // HEP Join Push Transitive Predicates
+          intermediateNode2 = transform(PlannerType.HEP, 
PlannerPhase.TRANSITIVE_CLOSURE, intermediateNode);
         }
 
         // Do Join Planning.
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index 548518f..90c00e6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -84,11 +84,11 @@ public abstract class AbstractStoragePlugin implements 
StoragePlugin {
     switch (phase) {
     case LOGICAL_PRUNE_AND_JOIN:
     case LOGICAL_PRUNE:
-    case LOGICAL:
+    case PARTITION_PRUNING:
       return getLogicalOptimizerRules(optimizerContext);
     case PHYSICAL:
       return getPhysicalOptimizerRules(optimizerContext);
-    case PARTITION_PRUNING:
+    case LOGICAL:
     case JOIN_PLANNING:
     default:
       return ImmutableSet.of();
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java 
b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
index 86001f6..febabfe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
@@ -433,4 +433,21 @@ public class TestPartitionFilter extends PlanTestBase {
         .run();
 
   }
+
+  @Test // DRILL-6173
+  public void testDirPruningTransitivePredicates() throws Exception {
+    final String query = "select * from dfs.`multilevel/parquet` t1 join 
dfs.`multilevel/parquet2` t2 on " +
+        " t1.dir0 = t2.dir0 where t1.dir0 = '1994' and t1.dir1 = 'Q1'";
+
+    String [] expectedPlan = {"1994"};
+    String [] excluded = {"1995", "Filter\\("};
+
+    // verify we get correct count(*).
+    int actualRowCount = testSql(query);
+    int expectedRowCount = 800;
+    assertEquals("Expected and actual row count should match", 
expectedRowCount, actualRowCount);
+
+    // verify plan that filter is applied in partition pruning.
+    testPlanMatchingPatterns(query, expectedPlan, excluded);
+  }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushdownWithTransitivePredicates.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushdownWithTransitivePredicates.java
index a58aebc..1f8f0d9 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushdownWithTransitivePredicates.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushdownWithTransitivePredicates.java
@@ -190,8 +190,7 @@ public class 
TestParquetFilterPushdownWithTransitivePredicates extends PlanTestB
     testPlanMatchingPatterns(query, expectedPlan);
   }
 
-  @Test // TODO: CALCITE-1048
-  @Ignore // For now plan has "first.*numRowGroups=16"
+  @Test
   public void testForFilterInHaving() throws Exception {
     String query = String.format("SELECT t1.`year`, t2.`year`, t1.`period`, 
t3.`period` FROM %s t1 " +
         "JOIN %s t2 ON t1.`year` = t2.`year` " +
@@ -270,7 +269,5 @@ public class 
TestParquetFilterPushdownWithTransitivePredicates extends PlanTestB
     final String[] expectedPlan = {"first.*numRowGroups=2", 
"second.*numRowGroups=1"};
     testPlanMatchingPatterns(query, expectedPlan);
   }
-
-
 }
 

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to