[
https://issues.apache.org/jira/browse/DRILL-6374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16487794#comment-16487794
]
ASF GitHub Bot commented on DRILL-6374:
---------------------------------------
Ben-Zvi closed pull request #1262: DRILL-6374: Transitive Closure leads to TPCH
Queries regressions and OOM when run concurrency test
URL: https://github.com/apache/drill/pull/1262
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index b5e09ef767..2a79751c61 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -265,8 +265,8 @@ static RuleSet
getDrillUserConfigurableLogicalRules(OptimizerRulesContext optimi
RuleInstance.UNION_TO_DISTINCT_RULE,
// Add support for WHERE style joins.
- DrillFilterJoinRules.DRILL_FILTER_ON_JOIN,
- DrillFilterJoinRules.DRILL_JOIN,
+ DrillFilterJoinRules.FILTER_INTO_JOIN,
+ DrillFilterJoinRules.JOIN_PUSH_CONDITION,
RuleInstance.JOIN_PUSH_EXPRESSIONS_RULE,
// End support for WHERE style joins.
@@ -532,19 +532,18 @@ private static RuleSet getSetOpTransposeRules() {
/**
* RuleSet for join transitive closure, used only in HepPlanner.<p>
- * TODO: {@link RuleInstance#JOIN_PUSH_TRANSITIVE_PREDICATES_RULE} should be
copied to #staticRuleSet,
- * once CALCITE-1048 is solved. This still should be present in {@link
#TRANSITIVE_CLOSURE} stage
- * for applying additional filters before {@link #DIRECTORY_PRUNING}.
+ * TODO: {@link RuleInstance#DRILL_JOIN_PUSH_TRANSITIVE_PREDICATES_RULE}
should be moved into {@link #staticRuleSet},
+ * (with using {@link DrillRelFactories#LOGICAL_BUILDER}) once CALCITE-1048
is solved. This block can be removed then.
*
* @return set of planning rules
*/
static RuleSet getJoinTransitiveClosureRules() {
return RuleSets.ofList(ImmutableSet.<RelOptRule> builder()
.add(
- DrillFilterJoinRules.DRILL_FILTER_ON_JOIN,
- DrillFilterJoinRules.DRILL_JOIN,
- RuleInstance.JOIN_PUSH_TRANSITIVE_PREDICATES_RULE,
- RuleInstance.FILTER_MERGE_RULE
+ RuleInstance.DRILL_JOIN_PUSH_TRANSITIVE_PREDICATES_RULE,
+ DrillFilterJoinRules.DRILL_FILTER_INTO_JOIN,
+ RuleInstance.REMOVE_IS_NOT_DISTINCT_FROM_RULE,
+ RuleInstance.DRILL_FILTER_MERGE_RULE
).build());
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
index 986fdbd4d1..80bbe88522 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
@@ -29,6 +29,7 @@
import org.apache.calcite.rel.rules.AggregateRemoveRule;
import org.apache.calcite.rel.rules.FilterCorrelateRule;
import org.apache.calcite.rel.rules.FilterMergeRule;
+import org.apache.calcite.rel.rules.FilterRemoveIsNotDistinctFromRule;
import org.apache.calcite.rel.rules.FilterSetOpTransposeRule;
import org.apache.calcite.rel.rules.JoinPushExpressionsRule;
import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
@@ -63,6 +64,9 @@
FilterMergeRule FILTER_MERGE_RULE =
new FilterMergeRule(DrillRelFactories.LOGICAL_BUILDER);
+ FilterMergeRule DRILL_FILTER_MERGE_RULE =
+ new
FilterMergeRule(DrillRelBuilder.proto(DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY));
+
FilterCorrelateRule FILTER_CORRELATE_RULE =
new FilterCorrelateRule(DrillRelFactories.LOGICAL_BUILDER);
@@ -120,6 +124,10 @@
* {@link org.apache.calcite.rel.core.Filter}s if those predicates can be
pushed
* to its inputs.
*/
- JoinPushTransitivePredicatesRule JOIN_PUSH_TRANSITIVE_PREDICATES_RULE =
- new JoinPushTransitivePredicatesRule(Join.class,
DrillRelFactories.LOGICAL_BUILDER);
+ JoinPushTransitivePredicatesRule DRILL_JOIN_PUSH_TRANSITIVE_PREDICATES_RULE =
+ new JoinPushTransitivePredicatesRule(Join.class, DrillRelBuilder.proto(
+ DrillRelFactories.DRILL_LOGICAL_JOIN_FACTORY,
DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY));
+
+ FilterRemoveIsNotDistinctFromRule REMOVE_IS_NOT_DISTINCT_FROM_RULE =
+ new
FilterRemoveIsNotDistinctFromRule(DrillRelBuilder.proto(DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY));
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterJoinRules.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterJoinRules.java
index 6d186e15e8..5e2410319d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterJoinRules.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterJoinRules.java
@@ -17,16 +17,15 @@
*/
package org.apache.drill.exec.planner.logical;
-import com.google.common.collect.Lists;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.rules.FilterJoinRule;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexNode;
import org.apache.drill.exec.planner.DrillRelBuilder;
+import java.util.ArrayList;
import java.util.List;
public class DrillFilterJoinRules {
@@ -47,10 +46,10 @@ public boolean apply(Join join, JoinRelType joinType,
RexNode exp) {
// into LEFT/RIGHT.
}
- List<RexNode> tmpLeftKeys = Lists.newArrayList();
- List<RexNode> tmpRightKeys = Lists.newArrayList();
- List<RelDataTypeField> sysFields = Lists.newArrayList();
- List<Integer> filterNulls = Lists.newArrayList();
+ List<RexNode> tmpLeftKeys = new ArrayList<>();
+ List<RexNode> tmpRightKeys = new ArrayList<>();
+ List<RelDataTypeField> sysFields = new ArrayList<>();
+ List<Integer> filterNulls = new ArrayList<>();
RexNode remaining = RelOptUtil.splitJoinCondition(sysFields,
join.getLeft(), join.getRight(),
exp, tmpLeftKeys, tmpRightKeys, filterNulls, null);
@@ -59,20 +58,44 @@ public boolean apply(Join join, JoinRelType joinType,
RexNode exp) {
}
};
+ /** Predicate that always returns true for any filter in OUTER join, and
only true
+ * for strict EQUAL or IS_DISTINCT_FROM conditions (without any mathematical
operations) over RexInputRef in INNER join.
+ * With this predicate, the filter expression that return true will be kept
in the JOIN OP.
+ * Example: INNER JOIN, L.C1 = R.C2 will be kepted in JOIN.
+ * L.C3 + 100 = R.C4 + 100, L.C5 < R.C6 will be
pulled up into Filter above JOIN.
+ * OUTER JOIN, Keep any filter in JOIN.
+ */
+ public static final FilterJoinRule.Predicate STRICT_EQUAL_IS_DISTINCT_FROM =
+ new FilterJoinRule.Predicate() {
+ public boolean apply(Join join, JoinRelType joinType, RexNode exp) {
+ if (joinType != JoinRelType.INNER) {
+ return true;
+ }
+
+ List<Integer> tmpLeftKeys = new ArrayList<>();
+ List<Integer> tmpRightKeys = new ArrayList<>();
+ List<Boolean> filterNulls = new ArrayList<>();
+
+ RexNode remaining =
+ RelOptUtil.splitJoinCondition(join.getLeft(), join.getRight(),
exp, tmpLeftKeys, tmpRightKeys, filterNulls);
+
+ return remaining.isAlwaysTrue();
+ }
+ };
+
/** Rule that pushes predicates from a Filter into the Join below them. */
- public static final FilterJoinRule DRILL_FILTER_ON_JOIN =
- new FilterJoinRule.FilterIntoJoinRule(true,
- DrillRelBuilder.proto(RelFactories.DEFAULT_FILTER_FACTORY,
- RelFactories.DEFAULT_PROJECT_FACTORY),
- EQUAL_IS_DISTINCT_FROM);
+ public static final FilterJoinRule FILTER_INTO_JOIN =
+ new FilterJoinRule.FilterIntoJoinRule(true,
DrillRelFactories.LOGICAL_BUILDER, EQUAL_IS_DISTINCT_FROM);
+ /** The same as above, but with Drill's operators. */
+ public static final FilterJoinRule DRILL_FILTER_INTO_JOIN =
+ new FilterJoinRule.FilterIntoJoinRule(true,
+
DrillRelBuilder.proto(DrillRelFactories.DRILL_LOGICAL_PROJECT_FACTORY,
+ DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY),
STRICT_EQUAL_IS_DISTINCT_FROM);
/** Rule that pushes predicates in a Join into the inputs to the join. */
- public static final FilterJoinRule DRILL_JOIN =
- new FilterJoinRule.JoinConditionPushRule(
- DrillRelBuilder.proto(RelFactories.DEFAULT_FILTER_FACTORY,
- RelFactories.DEFAULT_PROJECT_FACTORY),
- EQUAL_IS_DISTINCT_FROM);
+ public static final FilterJoinRule JOIN_PUSH_CONDITION =
+ new
FilterJoinRule.JoinConditionPushRule(DrillRelFactories.LOGICAL_BUILDER,
EQUAL_IS_DISTINCT_FROM);
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
index c7f2838397..2a2a09fad3 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
@@ -21,12 +21,15 @@
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.calcite.util.Pair;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
@@ -36,7 +39,7 @@
public class DrillPushFilterPastProjectRule extends RelOptRule {
- public final static RelOptRule INSTANCE = new
DrillPushFilterPastProjectRule();
+ public final static RelOptRule INSTANCE = new
DrillPushFilterPastProjectRule(DrillRelFactories.LOGICAL_BUILDER);
private static final Collection<String> BANNED_OPERATORS;
@@ -46,12 +49,8 @@
BANNED_OPERATORS.add("item");
}
- private DrillPushFilterPastProjectRule() {
- super(
- operand(
- LogicalFilter.class,
- operand(LogicalProject.class, any())),
- DrillRelFactories.LOGICAL_BUILDER, null);
+ private DrillPushFilterPastProjectRule(RelBuilderFactory relBuilderFactory) {
+ super(operand(LogicalFilter.class, operand(LogicalProject.class, any())),
relBuilderFactory,null);
}
//~ Methods ----------------------------------------------------------------
@@ -60,6 +59,7 @@ private DrillPushFilterPastProjectRule() {
public void onMatch(RelOptRuleCall call) {
Filter filterRel = call.rel(0);
Project projRel = call.rel(1);
+ RelBuilder builder = call.builder();
// get a conjunctions of the filter condition. For each conjunction, if it
refers to ITEM or FLATTEN expression
// then we could not pushed down. Otherwise, it's qualified to be pushed
down.
@@ -87,11 +87,14 @@ public void onMatch(RelOptRuleCall call) {
RexNode newCondition =
RelOptUtil.pushPastProject(qualifedPred, projRel);
- Filter newFilterRel = LogicalFilter.create(projRel.getInput(),
newCondition);
+ RelNode newFilterRel =
+ builder
+ .push(projRel.getInput())
+ .filter(newCondition)
+ .build();
- Project newProjRel =
- (Project) relBuilderFactory
- .create(newFilterRel.getCluster(), null)
+ RelNode newProjRel =
+ builder
.push(newFilterRel)
.projectNamed(Pair.left(projRel.getNamedProjects()),
Pair.right(projRel.getNamedProjects()), true)
.build();
@@ -108,7 +111,11 @@ public void onMatch(RelOptRuleCall call) {
// Project
// \
// Filter -- qualified filters
- Filter filterNotPushed = LogicalFilter.create(newProjRel,
unqualifiedPred);
+ RelNode filterNotPushed =
+ builder
+ .push(newProjRel)
+ .filter(unqualifiedPred)
+ .build();
call.transformTo(filterNotPushed);
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index e8dece6790..98e017fa07 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -237,12 +237,8 @@ protected DrillRel convertToRawDrel(final RelNode relNode)
throws SqlUnsupported
// HEP for rules, which are failed at the LOGICAL_PLANNING stage for
Volcano planner
final RelNode setOpTransposeNode = transform(PlannerType.HEP,
PlannerPhase.PRE_LOGICAL_PLANNING, relNode);
- // HEP Join Push Transitive Predicates
- final RelNode transitiveClosureNode =
- transform(PlannerType.HEP, PlannerPhase.TRANSITIVE_CLOSURE,
setOpTransposeNode);
-
- // HEP Directory pruning .
- final RelNode pruned = transform(PlannerType.HEP_BOTTOM_UP,
PlannerPhase.DIRECTORY_PRUNING, transitiveClosureNode);
+ // HEP Directory pruning.
+ final RelNode pruned = transform(PlannerType.HEP_BOTTOM_UP,
PlannerPhase.DIRECTORY_PRUNING, setOpTransposeNode);
final RelTraitSet logicalTraits =
pruned.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
final RelNode convertedRelNode;
@@ -254,13 +250,22 @@ protected DrillRel convertToRawDrel(final RelNode
relNode) throws SqlUnsupported
final RelNode intermediateNode2;
if (context.getPlannerSettings().isHepPartitionPruningEnabled()) {
- // hep is enabled and hep pruning is enabled.
final RelNode intermediateNode = transform(PlannerType.VOLCANO,
PlannerPhase.LOGICAL, pruned, logicalTraits);
- intermediateNode2 = transform(PlannerType.HEP_BOTTOM_UP,
PlannerPhase.PARTITION_PRUNING, intermediateNode);
+
+ // HEP Join Push Transitive Predicates
+ final RelNode transitiveClosureNode =
+ transform(PlannerType.HEP, PlannerPhase.TRANSITIVE_CLOSURE,
intermediateNode);
+
+ // hep is enabled and hep pruning is enabled.
+ intermediateNode2 = transform(PlannerType.HEP_BOTTOM_UP,
PlannerPhase.PARTITION_PRUNING, transitiveClosureNode);
} else {
// Only hep is enabled
- intermediateNode2 = transform(PlannerType.VOLCANO,
PlannerPhase.LOGICAL_PRUNE, pruned, logicalTraits);
+ final RelNode intermediateNode =
+ transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL_PRUNE,
pruned, logicalTraits);
+
+ // HEP Join Push Transitive Predicates
+ intermediateNode2 = transform(PlannerType.HEP,
PlannerPhase.TRANSITIVE_CLOSURE, intermediateNode);
}
// Do Join Planning.
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index 548518f87e..90c00e6ab6 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -84,11 +84,11 @@ public boolean supportsWrite() {
switch (phase) {
case LOGICAL_PRUNE_AND_JOIN:
case LOGICAL_PRUNE:
- case LOGICAL:
+ case PARTITION_PRUNING:
return getLogicalOptimizerRules(optimizerContext);
case PHYSICAL:
return getPhysicalOptimizerRules(optimizerContext);
- case PARTITION_PRUNING:
+ case LOGICAL:
case JOIN_PLANNING:
default:
return ImmutableSet.of();
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
index 86001f6e18..febabfef20 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
@@ -433,4 +433,21 @@ public void testPruneSameTableInJoin() throws Exception {
.run();
}
+
+ @Test // DRILL-6173
+ public void testDirPruningTransitivePredicates() throws Exception {
+ final String query = "select * from dfs.`multilevel/parquet` t1 join
dfs.`multilevel/parquet2` t2 on " +
+ " t1.dir0 = t2.dir0 where t1.dir0 = '1994' and t1.dir1 = 'Q1'";
+
+ String [] expectedPlan = {"1994"};
+ String [] excluded = {"1995", "Filter\\("};
+
+ // verify we get correct count(*).
+ int actualRowCount = testSql(query);
+ int expectedRowCount = 800;
+ assertEquals("Expected and actual row count should match",
expectedRowCount, actualRowCount);
+
+ // verify plan that filter is applied in partition pruning.
+ testPlanMatchingPatterns(query, expectedPlan, excluded);
+ }
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushdownWithTransitivePredicates.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushdownWithTransitivePredicates.java
index a58aebc082..1f8f0d90a7 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushdownWithTransitivePredicates.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushdownWithTransitivePredicates.java
@@ -190,8 +190,7 @@ public void testForTwoExists() throws Exception {
testPlanMatchingPatterns(query, expectedPlan);
}
- @Test // TODO: CALCITE-1048
- @Ignore // For now plan has "first.*numRowGroups=16"
+ @Test
public void testForFilterInHaving() throws Exception {
String query = String.format("SELECT t1.`year`, t2.`year`, t1.`period`,
t3.`period` FROM %s t1 " +
"JOIN %s t2 ON t1.`year` = t2.`year` " +
@@ -270,7 +269,5 @@ public void testForWithStatementAndDynamicStar() throws
Exception {
final String[] expectedPlan = {"first.*numRowGroups=2",
"second.*numRowGroups=1"};
testPlanMatchingPatterns(query, expectedPlan);
}
-
-
}
diff --git a/pom.xml b/pom.xml
index 620f73c5de..9bc75fa2bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,7 @@
<dep.guava.version>18.0</dep.guava.version>
<forkCount>2</forkCount>
<parquet.version>1.8.1-drill-r0</parquet.version>
- <calcite.version>1.16.0-drill-r1</calcite.version>
+ <calcite.version>1.16.0-drill-r2</calcite.version>
<avatica.version>1.11.0</avatica.version>
<janino.version>2.7.6</janino.version>
<sqlline.version>1.1.9-drill-r7</sqlline.version>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Transitive Closure leads to TPCH Queries regressions and OOM when run
> concurrency test
> --------------------------------------------------------------------------------------
>
> Key: DRILL-6374
> URL: https://issues.apache.org/jira/browse/DRILL-6374
> Project: Apache Drill
> Issue Type: Bug
> Components: Functions - Drill
> Affects Versions: 1.14.0
> Environment: RHEL 7
> Reporter: Dechang Gu
> Assignee: Vitalii Diravka
> Priority: Critical
> Labels: ready-to-commit
> Fix For: 1.14.0
>
> Attachments: TPCH_09_2_id_2517381b-1a61-3db5-40c3-4463bd421365.json,
> TPCH_09_2_id_2517497b-d4da-dab6-6124-abde5804a25f.json
>
>
> Run TPCH regression test on Apache Drill 1.14.0 master commit
> 6fcaf4268eddcb09010b5d9c5dfb3b3be5c3f903 (DRILL-6173), most of the queries
> regressed.
> In particular, TPC-H Query 9 takes about 4x time (36 sec vs 8.6 sec),
> comparing to that when run against the parent commit
> (9173308710c3decf8ff745493ad3e85ccdaf7c37).
> Further in the concurrency test for the commit, with 48 clients each running
> 16 TPCH queries (so total 768 queries are executed) with
> planner.width.max_per_node=5, some queries hit OOM and caused 273 queries
> failed, while for the parent commit all the 768 queries completed
> successfully.
>
> Profiles for TPCH_09 in the regression tests are uploaded:
> * The failing commit file name:
> [^TPCH_09_2_id_2517381b-1a61-3db5-40c3-4463bd421365.json],
> * The parent commit file name:
> [^TPCH_09_2_id_2517497b-d4da-dab6-6124-abde5804a25f.json] ).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)