This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fc7cb3305ea [feat](nereids) support null safe eq runtime filter (FE
part) (#31655)
fc7cb3305ea is described below
commit fc7cb3305eab09d64911d693eac50307d52c9b05
Author: minghong <[email protected]>
AuthorDate: Thu Mar 7 14:29:25 2024 +0800
[feat](nereids) support null safe eq runtime filter (FE part) (#31655)
be part has been merged in #31754
---
.../processor/post/RuntimeFilterGenerator.java | 34 +++++------
.../rules/expression/ExpressionOptimization.java | 4 +-
.../expression/rules/NullSafeEqualToEqual.java | 62 +++++++++++++++++++
.../rules/rewrite/FindHashConditionForJoin.java | 3 +-
.../trees/plans/physical/AbstractPhysicalPlan.java | 10 ++++
.../org/apache/doris/planner/HashJoinNode.java | 6 --
.../org/apache/doris/planner/RuntimeFilter.java | 9 +++
.../main/java/org/apache/doris/qe/Coordinator.java | 4 +-
.../expression/rules/NullSafeEqualToEqualTest.java | 69 ++++++++++++++++++++++
.../rules/rewrite/EliminateJoinByUniqueTest.java | 14 -----
10 files changed, 174 insertions(+), 41 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index f7a7c166bd7..8da3ede4200 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -24,6 +24,7 @@ import org.apache.doris.nereids.stats.ExpressionEstimation;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
+import org.apache.doris.nereids.trees.expressions.EqualPredicate;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
@@ -268,25 +269,22 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
List<Expression> hashJoinConjuncts =
join.getHashJoinConjuncts().stream().collect(Collectors.toList());
boolean buildSideContainsConsumer =
hasCTEConsumerDescendant((PhysicalPlan) join.right());
for (int i = 0; i < hashJoinConjuncts.size(); i++) {
- // BE do not support RF generated from NullSafeEqual, skip them
- if (hashJoinConjuncts.get(i) instanceof EqualTo) {
- EqualTo equalTo = ((EqualTo)
JoinUtils.swapEqualToForChildrenOrder(
- (EqualTo) hashJoinConjuncts.get(i),
join.left().getOutputSet()));
- for (TRuntimeFilterType type : legalTypes) {
- //bitmap rf is generated by nested loop join.
- if (type == TRuntimeFilterType.BITMAP) {
- continue;
- }
- long buildSideNdv = getBuildSideNdv(join, equalTo);
- Pair<PhysicalRelation, Slot> pair =
ctx.getAliasTransferMap().get(equalTo.right());
- // CteConsumer is not allowed to generate RF in order to
avoid RF cycle.
- if ((pair == null && buildSideContainsConsumer)
- || (pair != null && pair.first instanceof
PhysicalCTEConsumer)) {
- continue;
- }
- join.pushDownRuntimeFilter(context, generator, join,
equalTo.right(),
- equalTo.left(), type, buildSideNdv, i);
+ EqualPredicate equalTo = JoinUtils.swapEqualToForChildrenOrder(
+ (EqualPredicate) hashJoinConjuncts.get(i),
join.left().getOutputSet());
+ for (TRuntimeFilterType type : legalTypes) {
+ //bitmap rf is generated by nested loop join.
+ if (type == TRuntimeFilterType.BITMAP) {
+ continue;
+ }
+ long buildSideNdv = getBuildSideNdv(join, equalTo);
+ Pair<PhysicalRelation, Slot> pair =
ctx.getAliasTransferMap().get(equalTo.right());
+ // CteConsumer is not allowed to generate RF in order to avoid
RF cycle.
+ if ((pair == null && buildSideContainsConsumer)
+ || (pair != null && pair.first instanceof
PhysicalCTEConsumer)) {
+ continue;
}
+ join.pushDownRuntimeFilter(context, generator, join,
equalTo.right(),
+ equalTo.left(), type, buildSideNdv, i);
}
}
return join;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java
index e7b3a308f0f..fdf9820c582 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java
@@ -22,6 +22,7 @@ import
org.apache.doris.nereids.rules.expression.rules.CaseWhenToIf;
import org.apache.doris.nereids.rules.expression.rules.DateFunctionRewrite;
import org.apache.doris.nereids.rules.expression.rules.DistinctPredicatesRule;
import org.apache.doris.nereids.rules.expression.rules.ExtractCommonFactorRule;
+import org.apache.doris.nereids.rules.expression.rules.NullSafeEqualToEqual;
import org.apache.doris.nereids.rules.expression.rules.OrToIn;
import
org.apache.doris.nereids.rules.expression.rules.SimplifyComparisonPredicate;
import
org.apache.doris.nereids.rules.expression.rules.SimplifyDecimalV3Comparison;
@@ -48,7 +49,8 @@ public class ExpressionOptimization extends ExpressionRewrite
{
OrToIn.INSTANCE,
ArrayContainToArrayOverlap.INSTANCE,
CaseWhenToIf.INSTANCE,
- TopnToMax.INSTANCE
+ TopnToMax.INSTANCE,
+ NullSafeEqualToEqual.INSTANCE
);
private static final ExpressionRuleExecutor EXECUTOR = new
ExpressionRuleExecutor(OPTIMIZE_REWRITE_RULES);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/NullSafeEqualToEqual.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/NullSafeEqualToEqual.java
new file mode 100644
index 00000000000..c215e65f722
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/NullSafeEqualToEqual.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.expression.rules;
+
+import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
+import org.apache.doris.nereids.rules.expression.ExpressionRewriteRule;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.IsNull;
+import org.apache.doris.nereids.trees.expressions.NullSafeEqual;
+import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
+import
org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
+
+/**
+ * convert "<=>" to "=", if any side is not nullable
+ * convert "A <=> null" to "A is null"
+ */
+public class NullSafeEqualToEqual extends
DefaultExpressionRewriter<ExpressionRewriteContext> implements
+ ExpressionRewriteRule<ExpressionRewriteContext> {
+ public static final NullSafeEqualToEqual INSTANCE = new
NullSafeEqualToEqual();
+
+ @Override
+ public Expression rewrite(Expression expr, ExpressionRewriteContext ctx) {
+ return expr.accept(this, null);
+ }
+
+ @Override
+ public Expression visitNullSafeEqual(NullSafeEqual nullSafeEqual,
ExpressionRewriteContext ctx) {
+ if (nullSafeEqual.left() instanceof NullLiteral) {
+ if (nullSafeEqual.right().nullable()) {
+ return new IsNull(nullSafeEqual.right());
+ } else {
+ return BooleanLiteral.FALSE;
+ }
+ } else if (nullSafeEqual.right() instanceof NullLiteral) {
+ if (nullSafeEqual.left().nullable()) {
+ return new IsNull(nullSafeEqual.left());
+ } else {
+ return BooleanLiteral.FALSE;
+ }
+ } else if (!nullSafeEqual.left().nullable() ||
!nullSafeEqual.right().nullable()) {
+ return new EqualTo(nullSafeEqual.left(), nullSafeEqual.right());
+ }
+ return nullSafeEqual;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/FindHashConditionForJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/FindHashConditionForJoin.java
index 2826d709065..86bcdf4487b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/FindHashConditionForJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/FindHashConditionForJoin.java
@@ -63,7 +63,8 @@ public class FindHashConditionForJoin extends
OneRewriteRuleFactory {
}
List<Expression> combinedHashJoinConjuncts = Streams
- .concat(join.getHashJoinConjuncts().stream(),
extractedHashJoinConjuncts.stream())
+ .concat(join.getHashJoinConjuncts().stream(),
+ extractedHashJoinConjuncts.stream())
.distinct()
.collect(ImmutableList.toImmutableList());
JoinType joinType = join.getJoinType();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
index 1e9135d600e..9f71bda0b40 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
@@ -24,7 +24,9 @@ import
org.apache.doris.nereids.processor.post.RuntimeFilterContext;
import org.apache.doris.nereids.processor.post.RuntimeFilterGenerator;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.EqualPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NullSafeEqual;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Explainable;
@@ -131,6 +133,14 @@ public abstract class AbstractPhysicalPlan extends
AbstractPlan implements Physi
ctx.setTargetsOnScanNode(ctx.getAliasTransferPair(probeSlot).first, scanSlot);
}
} else {
+ // null safe equal runtime filter only support bloom filter
+ EqualPredicate eq = (EqualPredicate)
builderNode.getHashJoinConjuncts().get(exprOrder);
+ if (eq instanceof NullSafeEqual && type ==
TRuntimeFilterType.IN_OR_BLOOM) {
+ type = TRuntimeFilterType.BLOOM;
+ }
+ if (eq instanceof NullSafeEqual && type !=
TRuntimeFilterType.BLOOM) {
+ return false;
+ }
filter = new RuntimeFilter(generator.getNextId(),
src, ImmutableList.of(scanSlot),
ImmutableList.of(probeExpr),
type, exprOrder, builderNode, buildSideNdv,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 1413d42ceee..d8cc4a77a0a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -199,12 +199,6 @@ public class HashJoinNode extends JoinNodeBase {
for (Expr eqJoinPredicate : eqJoinConjuncts) {
Preconditions.checkArgument(eqJoinPredicate instanceof
BinaryPredicate);
BinaryPredicate eqJoin = (BinaryPredicate) eqJoinPredicate;
- if (eqJoin.getOp().equals(BinaryPredicate.Operator.EQ_FOR_NULL)) {
- Preconditions.checkArgument(eqJoin.getChildren().size() == 2);
- if (!eqJoin.getChild(0).isNullable() ||
!eqJoin.getChild(1).isNullable()) {
- eqJoin.setOp(BinaryPredicate.Operator.EQ);
- }
- }
this.eqJoinConjuncts.add(eqJoin);
}
this.distrMode = DistributionMode.NONE;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index ade087e2de0..00117beae72 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -230,6 +230,15 @@ public final class RuntimeFilter {
}
tFilter.setOptRemoteRf(hasRemoteTargets);
tFilter.setBloomFilterSizeCalculatedByNdv(bloomFilterSizeCalculatedByNdv);
+ if (builderNode instanceof HashJoinNode) {
+ HashJoinNode join = (HashJoinNode) builderNode;
+ BinaryPredicate eq = join.getEqJoinConjuncts().get(exprOrder);
+ if (eq.getOp().equals(BinaryPredicate.Operator.EQ_FOR_NULL)) {
+ tFilter.setNullAware(true);
+ } else {
+ tFilter.setNullAware(false);
+ }
+ }
return tFilter;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index ba2e4da9cc6..d9358599a15 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -96,6 +96,7 @@ import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TPipelineFragmentParamsList;
import org.apache.doris.thrift.TPipelineInstanceParams;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
+import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TPlanFragmentDestination;
import org.apache.doris.thrift.TPlanFragmentExecParams;
import org.apache.doris.thrift.TQueryGlobals;
@@ -3767,6 +3768,7 @@ public class Coordinator implements CoordInterface {
Map<TNetworkAddress, TPipelineFragmentParams> res = new HashMap();
Map<TNetworkAddress, Integer> instanceIdx = new HashMap();
+ TPlanFragment fragmentThrift = fragment.toThrift();
for (int i = 0; i < instanceExecParams.size(); ++i) {
final FInstanceExecParam instanceExecParam =
instanceExecParams.get(i);
Map<Integer, List<TScanRangeParams>> scanRanges =
instanceExecParam.perNodeScanRanges;
@@ -3792,7 +3794,7 @@ public class Coordinator implements CoordInterface {
params.query_options.setMemLimit(memLimit);
params.setSendQueryStatisticsWithEveryBatch(
fragment.isTransferQueryStatisticsWithEveryBatch());
- params.setFragment(fragment.toThrift());
+ params.setFragment(fragmentThrift);
params.setLocalParams(Lists.newArrayList());
if (tWorkloadGroups != null) {
params.setWorkloadGroups(tWorkloadGroups);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/expression/rules/NullSafeEqualToEqualTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/expression/rules/NullSafeEqualToEqualTest.java
new file mode 100644
index 00000000000..3a2b4eb2a6e
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/expression/rules/NullSafeEqualToEqualTest.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.expression.rules;
+
+import org.apache.doris.nereids.rules.expression.ExpressionRewriteTestHelper;
+import org.apache.doris.nereids.rules.expression.ExpressionRuleExecutor;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.IsNull;
+import org.apache.doris.nereids.trees.expressions.NullSafeEqual;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
+import org.apache.doris.nereids.types.StringType;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.Test;
+
+class NullSafeEqualToEqualTest extends ExpressionRewriteTestHelper {
+
+ // "A<=> Null" to "A is null"
+ @Test
+ void testNullSafeEqualToIsNull() {
+ executor = new
ExpressionRuleExecutor(ImmutableList.of(NullSafeEqualToEqual.INSTANCE));
+ SlotReference slot = new SlotReference("a", StringType.INSTANCE, true);
+ assertRewrite(new NullSafeEqual(slot, NullLiteral.INSTANCE), new
IsNull(slot));
+ }
+
+ // "A<=> Null" to "False", when A is not nullable
+ @Test
+ void testNullSafeEqualToFalse() {
+ executor = new
ExpressionRuleExecutor(ImmutableList.of(NullSafeEqualToEqual.INSTANCE));
+ SlotReference slot = new SlotReference("a", StringType.INSTANCE,
false);
+ assertRewrite(new NullSafeEqual(slot, NullLiteral.INSTANCE),
BooleanLiteral.FALSE);
+ }
+
+ // "A<=> "abc" to "A = "abc"
+ @Test
+ void testNullSafeEqualToEqual() {
+ executor = new
ExpressionRuleExecutor(ImmutableList.of(NullSafeEqualToEqual.INSTANCE));
+ SlotReference slot = new SlotReference("a", StringType.INSTANCE, true);
+ StringLiteral str = new StringLiteral("abc");
+ assertRewrite(new NullSafeEqual(slot, str), new EqualTo(slot, str));
+ }
+
+ // "A<=>B" not changed
+ @Test
+ void testNullSafeEqualNotChanged() {
+ executor = new
ExpressionRuleExecutor(ImmutableList.of(NullSafeEqualToEqual.INSTANCE));
+ SlotReference a = new SlotReference("a", StringType.INSTANCE, true);
+ SlotReference b = new SlotReference("b", StringType.INSTANCE, true);
+ assertRewrite(new NullSafeEqual(a, b), new NullSafeEqual(a, b));
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateJoinByUniqueTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateJoinByUniqueTest.java
index c3b9c9de005..778b65f992c 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateJoinByUniqueTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateJoinByUniqueTest.java
@@ -56,13 +56,6 @@ class EliminateJoinByUniqueTest extends TestWithFeService
implements MemoPattern
.nonMatch(logicalJoin())
.printlnTree();
- sql = "select t1.id1 from t1 left outer join t2 on t1.id1 <=> t2.id2";
- PlanChecker.from(connectContext)
- .analyze(sql)
- .rewrite()
- .matches(logicalJoin())
- .printlnTree();
-
sql = "select t2.id2 from t1 left outer join t2 on t1.id1 = t2.id2";
PlanChecker.from(connectContext)
.analyze(sql)
@@ -80,13 +73,6 @@ class EliminateJoinByUniqueTest extends TestWithFeService
implements MemoPattern
.nonMatch(logicalJoin())
.printlnTree();
- sql = "select t1.id1 from t1 left outer join t2 on t1.id_null <=>
t2.id2";
- PlanChecker.from(connectContext)
- .analyze(sql)
- .rewrite()
- .matches(logicalJoin())
- .printlnTree();
-
sql = "select t2.id2 from t1 left outer join t2 on t1.id_null =
t2.id2";
PlanChecker.from(connectContext)
.analyze(sql)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]