This is an automated email from the ASF dual-hosted git repository.
boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 1fe31c3 DRILL-6374: Transitive Closure leads to TPCH Queries
regressions and OOM when run concurrency test
1fe31c3 is described below
commit 1fe31c34ebf98bd913a91a8223b6f6b29c80f4e8
Author: Vitalii Diravka <[email protected]>
AuthorDate: Mon May 14 15:27:16 2018 +0300
DRILL-6374: Transitive Closure leads to TPCH Queries regressions and OOM
when run concurrency test
- Test case for Directory Pruning with Transitive Predicates
- Improving description for STRICT_EQUAL_IS_DISTINCT_FROM predicate
closes #1262
---
.../apache/drill/exec/planner/PlannerPhase.java | 17 ++++---
.../apache/drill/exec/planner/RuleInstance.java | 12 ++++-
.../exec/planner/logical/DrillFilterJoinRules.java | 55 +++++++++++++++-------
.../logical/DrillPushFilterPastProjectRule.java | 31 +++++++-----
.../planner/sql/handlers/DefaultSqlHandler.java | 23 +++++----
.../drill/exec/store/AbstractStoragePlugin.java | 4 +-
.../java/org/apache/drill/TestPartitionFilter.java | 17 +++++++
...quetFilterPushdownWithTransitivePredicates.java | 5 +-
8 files changed, 110 insertions(+), 54 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index b5e09ef..2a79751 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -265,8 +265,8 @@ public enum PlannerPhase {
RuleInstance.UNION_TO_DISTINCT_RULE,
// Add support for WHERE style joins.
- DrillFilterJoinRules.DRILL_FILTER_ON_JOIN,
- DrillFilterJoinRules.DRILL_JOIN,
+ DrillFilterJoinRules.FILTER_INTO_JOIN,
+ DrillFilterJoinRules.JOIN_PUSH_CONDITION,
RuleInstance.JOIN_PUSH_EXPRESSIONS_RULE,
// End support for WHERE style joins.
@@ -532,19 +532,18 @@ public enum PlannerPhase {
/**
* RuleSet for join transitive closure, used only in HepPlanner.<p>
- * TODO: {@link RuleInstance#JOIN_PUSH_TRANSITIVE_PREDICATES_RULE} should be
copied to #staticRuleSet,
- * once CALCITE-1048 is solved. This still should be present in {@link
#TRANSITIVE_CLOSURE} stage
- * for applying additional filters before {@link #DIRECTORY_PRUNING}.
+ * TODO: {@link RuleInstance#DRILL_JOIN_PUSH_TRANSITIVE_PREDICATES_RULE}
should be moved into {@link #staticRuleSet},
+ * (with using {@link DrillRelFactories#LOGICAL_BUILDER}) once CALCITE-1048
is solved. This block can be removed then.
*
* @return set of planning rules
*/
static RuleSet getJoinTransitiveClosureRules() {
return RuleSets.ofList(ImmutableSet.<RelOptRule> builder()
.add(
- DrillFilterJoinRules.DRILL_FILTER_ON_JOIN,
- DrillFilterJoinRules.DRILL_JOIN,
- RuleInstance.JOIN_PUSH_TRANSITIVE_PREDICATES_RULE,
- RuleInstance.FILTER_MERGE_RULE
+ RuleInstance.DRILL_JOIN_PUSH_TRANSITIVE_PREDICATES_RULE,
+ DrillFilterJoinRules.DRILL_FILTER_INTO_JOIN,
+ RuleInstance.REMOVE_IS_NOT_DISTINCT_FROM_RULE,
+ RuleInstance.DRILL_FILTER_MERGE_RULE
).build());
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
index 986fdbd..80bbe88 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
@@ -29,6 +29,7 @@ import
org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule;
import org.apache.calcite.rel.rules.AggregateRemoveRule;
import org.apache.calcite.rel.rules.FilterCorrelateRule;
import org.apache.calcite.rel.rules.FilterMergeRule;
+import org.apache.calcite.rel.rules.FilterRemoveIsNotDistinctFromRule;
import org.apache.calcite.rel.rules.FilterSetOpTransposeRule;
import org.apache.calcite.rel.rules.JoinPushExpressionsRule;
import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
@@ -63,6 +64,9 @@ public interface RuleInstance {
FilterMergeRule FILTER_MERGE_RULE =
new FilterMergeRule(DrillRelFactories.LOGICAL_BUILDER);
+ FilterMergeRule DRILL_FILTER_MERGE_RULE =
+ new
FilterMergeRule(DrillRelBuilder.proto(DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY));
+
FilterCorrelateRule FILTER_CORRELATE_RULE =
new FilterCorrelateRule(DrillRelFactories.LOGICAL_BUILDER);
@@ -120,6 +124,10 @@ public interface RuleInstance {
* {@link org.apache.calcite.rel.core.Filter}s if those predicates can be
pushed
* to its inputs.
*/
- JoinPushTransitivePredicatesRule JOIN_PUSH_TRANSITIVE_PREDICATES_RULE =
- new JoinPushTransitivePredicatesRule(Join.class,
DrillRelFactories.LOGICAL_BUILDER);
+ JoinPushTransitivePredicatesRule DRILL_JOIN_PUSH_TRANSITIVE_PREDICATES_RULE =
+ new JoinPushTransitivePredicatesRule(Join.class, DrillRelBuilder.proto(
+ DrillRelFactories.DRILL_LOGICAL_JOIN_FACTORY,
DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY));
+
+ FilterRemoveIsNotDistinctFromRule REMOVE_IS_NOT_DISTINCT_FROM_RULE =
+ new
FilterRemoveIsNotDistinctFromRule(DrillRelBuilder.proto(DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY));
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterJoinRules.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterJoinRules.java
index 6d186e1..5e24103 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterJoinRules.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterJoinRules.java
@@ -17,16 +17,15 @@
*/
package org.apache.drill.exec.planner.logical;
-import com.google.common.collect.Lists;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.rules.FilterJoinRule;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexNode;
import org.apache.drill.exec.planner.DrillRelBuilder;
+import java.util.ArrayList;
import java.util.List;
public class DrillFilterJoinRules {
@@ -47,10 +46,10 @@ public class DrillFilterJoinRules {
// into LEFT/RIGHT.
}
- List<RexNode> tmpLeftKeys = Lists.newArrayList();
- List<RexNode> tmpRightKeys = Lists.newArrayList();
- List<RelDataTypeField> sysFields = Lists.newArrayList();
- List<Integer> filterNulls = Lists.newArrayList();
+ List<RexNode> tmpLeftKeys = new ArrayList<>();
+ List<RexNode> tmpRightKeys = new ArrayList<>();
+ List<RelDataTypeField> sysFields = new ArrayList<>();
+ List<Integer> filterNulls = new ArrayList<>();
RexNode remaining = RelOptUtil.splitJoinCondition(sysFields,
join.getLeft(), join.getRight(),
exp, tmpLeftKeys, tmpRightKeys, filterNulls, null);
@@ -59,20 +58,44 @@ public class DrillFilterJoinRules {
}
};
+ /** Predicate that always returns true for any filter in OUTER join, and
only true
+ * for strict EQUAL or IS_DISTINCT_FROM conditions (without any mathematical
operations) over RexInputRef in INNER join.
+ * With this predicate, the filter expression that return true will be kept
in the JOIN OP.
+ * Example: INNER JOIN, L.C1 = R.C2 will be kepted in JOIN.
+ * L.C3 + 100 = R.C4 + 100, L.C5 < R.C6 will be
pulled up into Filter above JOIN.
+ * OUTER JOIN, Keep any filter in JOIN.
+ */
+ public static final FilterJoinRule.Predicate STRICT_EQUAL_IS_DISTINCT_FROM =
+ new FilterJoinRule.Predicate() {
+ public boolean apply(Join join, JoinRelType joinType, RexNode exp) {
+ if (joinType != JoinRelType.INNER) {
+ return true;
+ }
+
+ List<Integer> tmpLeftKeys = new ArrayList<>();
+ List<Integer> tmpRightKeys = new ArrayList<>();
+ List<Boolean> filterNulls = new ArrayList<>();
+
+ RexNode remaining =
+ RelOptUtil.splitJoinCondition(join.getLeft(), join.getRight(),
exp, tmpLeftKeys, tmpRightKeys, filterNulls);
+
+ return remaining.isAlwaysTrue();
+ }
+ };
+
/** Rule that pushes predicates from a Filter into the Join below them. */
- public static final FilterJoinRule DRILL_FILTER_ON_JOIN =
- new FilterJoinRule.FilterIntoJoinRule(true,
- DrillRelBuilder.proto(RelFactories.DEFAULT_FILTER_FACTORY,
- RelFactories.DEFAULT_PROJECT_FACTORY),
- EQUAL_IS_DISTINCT_FROM);
+ public static final FilterJoinRule FILTER_INTO_JOIN =
+ new FilterJoinRule.FilterIntoJoinRule(true,
DrillRelFactories.LOGICAL_BUILDER, EQUAL_IS_DISTINCT_FROM);
+ /** The same as above, but with Drill's operators. */
+ public static final FilterJoinRule DRILL_FILTER_INTO_JOIN =
+ new FilterJoinRule.FilterIntoJoinRule(true,
+
DrillRelBuilder.proto(DrillRelFactories.DRILL_LOGICAL_PROJECT_FACTORY,
+ DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY),
STRICT_EQUAL_IS_DISTINCT_FROM);
/** Rule that pushes predicates in a Join into the inputs to the join. */
- public static final FilterJoinRule DRILL_JOIN =
- new FilterJoinRule.JoinConditionPushRule(
- DrillRelBuilder.proto(RelFactories.DEFAULT_FILTER_FACTORY,
- RelFactories.DEFAULT_PROJECT_FACTORY),
- EQUAL_IS_DISTINCT_FROM);
+ public static final FilterJoinRule JOIN_PUSH_CONDITION =
+ new
FilterJoinRule.JoinConditionPushRule(DrillRelFactories.LOGICAL_BUILDER,
EQUAL_IS_DISTINCT_FROM);
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
index c7f2838..2a2a09f 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
@@ -21,12 +21,15 @@ import com.google.common.collect.Lists;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.calcite.util.Pair;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
@@ -36,7 +39,7 @@ import java.util.List;
public class DrillPushFilterPastProjectRule extends RelOptRule {
- public final static RelOptRule INSTANCE = new
DrillPushFilterPastProjectRule();
+ public final static RelOptRule INSTANCE = new
DrillPushFilterPastProjectRule(DrillRelFactories.LOGICAL_BUILDER);
private static final Collection<String> BANNED_OPERATORS;
@@ -46,12 +49,8 @@ public class DrillPushFilterPastProjectRule extends
RelOptRule {
BANNED_OPERATORS.add("item");
}
- private DrillPushFilterPastProjectRule() {
- super(
- operand(
- LogicalFilter.class,
- operand(LogicalProject.class, any())),
- DrillRelFactories.LOGICAL_BUILDER, null);
+ private DrillPushFilterPastProjectRule(RelBuilderFactory relBuilderFactory) {
+ super(operand(LogicalFilter.class, operand(LogicalProject.class, any())),
relBuilderFactory,null);
}
//~ Methods ----------------------------------------------------------------
@@ -60,6 +59,7 @@ public class DrillPushFilterPastProjectRule extends
RelOptRule {
public void onMatch(RelOptRuleCall call) {
Filter filterRel = call.rel(0);
Project projRel = call.rel(1);
+ RelBuilder builder = call.builder();
// get a conjunctions of the filter condition. For each conjunction, if it
refers to ITEM or FLATTEN expression
// then we could not pushed down. Otherwise, it's qualified to be pushed
down.
@@ -87,11 +87,14 @@ public class DrillPushFilterPastProjectRule extends
RelOptRule {
RexNode newCondition =
RelOptUtil.pushPastProject(qualifedPred, projRel);
- Filter newFilterRel = LogicalFilter.create(projRel.getInput(),
newCondition);
+ RelNode newFilterRel =
+ builder
+ .push(projRel.getInput())
+ .filter(newCondition)
+ .build();
- Project newProjRel =
- (Project) relBuilderFactory
- .create(newFilterRel.getCluster(), null)
+ RelNode newProjRel =
+ builder
.push(newFilterRel)
.projectNamed(Pair.left(projRel.getNamedProjects()),
Pair.right(projRel.getNamedProjects()), true)
.build();
@@ -108,7 +111,11 @@ public class DrillPushFilterPastProjectRule extends
RelOptRule {
// Project
// \
// Filter -- qualified filters
- Filter filterNotPushed = LogicalFilter.create(newProjRel,
unqualifiedPred);
+ RelNode filterNotPushed =
+ builder
+ .push(newProjRel)
+ .filter(unqualifiedPred)
+ .build();
call.transformTo(filterNotPushed);
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index e8dece6..98e017f 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -237,12 +237,8 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
// HEP for rules, which are failed at the LOGICAL_PLANNING stage for
Volcano planner
final RelNode setOpTransposeNode = transform(PlannerType.HEP,
PlannerPhase.PRE_LOGICAL_PLANNING, relNode);
- // HEP Join Push Transitive Predicates
- final RelNode transitiveClosureNode =
- transform(PlannerType.HEP, PlannerPhase.TRANSITIVE_CLOSURE,
setOpTransposeNode);
-
- // HEP Directory pruning .
- final RelNode pruned = transform(PlannerType.HEP_BOTTOM_UP,
PlannerPhase.DIRECTORY_PRUNING, transitiveClosureNode);
+ // HEP Directory pruning.
+ final RelNode pruned = transform(PlannerType.HEP_BOTTOM_UP,
PlannerPhase.DIRECTORY_PRUNING, setOpTransposeNode);
final RelTraitSet logicalTraits =
pruned.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
final RelNode convertedRelNode;
@@ -254,13 +250,22 @@ public class DefaultSqlHandler extends AbstractSqlHandler
{
final RelNode intermediateNode2;
if (context.getPlannerSettings().isHepPartitionPruningEnabled()) {
- // hep is enabled and hep pruning is enabled.
final RelNode intermediateNode = transform(PlannerType.VOLCANO,
PlannerPhase.LOGICAL, pruned, logicalTraits);
- intermediateNode2 = transform(PlannerType.HEP_BOTTOM_UP,
PlannerPhase.PARTITION_PRUNING, intermediateNode);
+
+ // HEP Join Push Transitive Predicates
+ final RelNode transitiveClosureNode =
+ transform(PlannerType.HEP, PlannerPhase.TRANSITIVE_CLOSURE,
intermediateNode);
+
+ // hep is enabled and hep pruning is enabled.
+ intermediateNode2 = transform(PlannerType.HEP_BOTTOM_UP,
PlannerPhase.PARTITION_PRUNING, transitiveClosureNode);
} else {
// Only hep is enabled
- intermediateNode2 = transform(PlannerType.VOLCANO,
PlannerPhase.LOGICAL_PRUNE, pruned, logicalTraits);
+ final RelNode intermediateNode =
+ transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL_PRUNE,
pruned, logicalTraits);
+
+ // HEP Join Push Transitive Predicates
+ intermediateNode2 = transform(PlannerType.HEP,
PlannerPhase.TRANSITIVE_CLOSURE, intermediateNode);
}
// Do Join Planning.
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index 548518f..90c00e6 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -84,11 +84,11 @@ public abstract class AbstractStoragePlugin implements
StoragePlugin {
switch (phase) {
case LOGICAL_PRUNE_AND_JOIN:
case LOGICAL_PRUNE:
- case LOGICAL:
+ case PARTITION_PRUNING:
return getLogicalOptimizerRules(optimizerContext);
case PHYSICAL:
return getPhysicalOptimizerRules(optimizerContext);
- case PARTITION_PRUNING:
+ case LOGICAL:
case JOIN_PLANNING:
default:
return ImmutableSet.of();
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
index 86001f6..febabfe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
@@ -433,4 +433,21 @@ public class TestPartitionFilter extends PlanTestBase {
.run();
}
+
+ @Test // DRILL-6173
+ public void testDirPruningTransitivePredicates() throws Exception {
+ final String query = "select * from dfs.`multilevel/parquet` t1 join
dfs.`multilevel/parquet2` t2 on " +
+ " t1.dir0 = t2.dir0 where t1.dir0 = '1994' and t1.dir1 = 'Q1'";
+
+ String [] expectedPlan = {"1994"};
+ String [] excluded = {"1995", "Filter\\("};
+
+ // verify we get correct count(*).
+ int actualRowCount = testSql(query);
+ int expectedRowCount = 800;
+ assertEquals("Expected and actual row count should match",
expectedRowCount, actualRowCount);
+
+ // verify plan that filter is applied in partition pruning.
+ testPlanMatchingPatterns(query, expectedPlan, excluded);
+ }
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushdownWithTransitivePredicates.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushdownWithTransitivePredicates.java
index a58aebc..1f8f0d9 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushdownWithTransitivePredicates.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushdownWithTransitivePredicates.java
@@ -190,8 +190,7 @@ public class
TestParquetFilterPushdownWithTransitivePredicates extends PlanTestB
testPlanMatchingPatterns(query, expectedPlan);
}
- @Test // TODO: CALCITE-1048
- @Ignore // For now plan has "first.*numRowGroups=16"
+ @Test
public void testForFilterInHaving() throws Exception {
String query = String.format("SELECT t1.`year`, t2.`year`, t1.`period`,
t3.`period` FROM %s t1 " +
"JOIN %s t2 ON t1.`year` = t2.`year` " +
@@ -270,7 +269,5 @@ public class
TestParquetFilterPushdownWithTransitivePredicates extends PlanTestB
final String[] expectedPlan = {"first.*numRowGroups=2",
"second.*numRowGroups=1"};
testPlanMatchingPatterns(query, expectedPlan);
}
-
-
}
--
To stop receiving notification emails like this one, please contact
[email protected].