This is an automated email from the ASF dual-hosted git repository.
englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cf073ec8ce6 [runtimefilter](nerieds)support Non equal runtime filter
for nested loop join #25193
cf073ec8ce6 is described below
commit cf073ec8ce6c7d5654249a8a699bc9185a822770
Author: minghong <[email protected]>
AuthorDate: Mon Oct 16 17:49:47 2023 +0800
[runtimefilter](nerieds)support Non equal runtime filter for nested loop
join #25193
---
.../glue/translator/PhysicalPlanTranslator.java | 3 +-
.../glue/translator/RuntimeFilterTranslator.java | 4 +-
.../processor/post/RuntimeFilterGenerator.java | 132 ++++++++++++++++++---
.../trees/plans/physical/AbstractPhysicalJoin.java | 28 +++++
.../trees/plans/physical/PhysicalHashJoin.java | 29 -----
.../plans/physical/PhysicalNestedLoopJoin.java | 23 ++--
.../trees/plans/physical/RuntimeFilter.java | 22 +++-
.../org/apache/doris/planner/DataStreamSink.java | 19 +--
.../java/org/apache/doris/planner/PlanNode.java | 19 +--
.../org/apache/doris/planner/RuntimeFilter.java | 66 +++++++++--
.../correctness_p0/test_runtime_filter.groovy | 111 +++++++++++++++++
11 files changed, 350 insertions(+), 106 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 9361b7c63e3..5e4097bbfec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -180,6 +180,7 @@ import org.apache.doris.tablefunction.TableValuedFunctionIf;
import org.apache.doris.thrift.TFetchOption;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TPushAggOp;
+import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -1392,7 +1393,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
.getRuntimeFilterOfHashJoinNode(nestedLoopJoin);
filters.forEach(filter -> runtimeFilterTranslator
.createLegacyRuntimeFilter(filter, nestedLoopJoinNode,
context));
- if (!filters.isEmpty()) {
+ if (filters.stream().anyMatch(filter -> filter.getType() ==
TRuntimeFilterType.BITMAP)) {
nestedLoopJoinNode.setOutputLeftSideOnly(true);
}
});
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 982c8677aa7..8ffd307e328 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -152,8 +152,8 @@ public class RuntimeFilterTranslator {
if (!hasInvalidTarget) {
org.apache.doris.planner.RuntimeFilter origFilter
=
org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
- filter.getId(), node, src, filter.getExprOrder(),
targetExprList,
- targetTupleIdMapList, filter.getType(),
context.getLimits(), filter.getBuildSideNdv());
+ filter, node, src, targetExprList,
+ targetTupleIdMapList, context.getLimits());
if (node instanceof HashJoinNode) {
origFilter.setIsBroadcast(((HashJoinNode)
node).getDistributionMode() == DistributionMode.BROADCAST);
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index 2683754d77e..b99b8904e5e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -23,9 +23,14 @@ import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.stats.ExpressionEstimation;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.CTEId;
+import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.GreaterThan;
+import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
+import org.apache.doris.nereids.trees.expressions.LessThan;
+import org.apache.doris.nereids.trees.expressions.LessThanEqual;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Slot;
@@ -53,6 +58,7 @@ import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.thrift.TMinMaxRuntimeFilterType;
import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.base.Preconditions;
@@ -113,6 +119,10 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
join.right().accept(this, context);
join.left().accept(this, context);
+ if
(RuntimeFilterGenerator.DENIED_JOIN_TYPES.contains(join.getJoinType()) ||
join.isMarkJoin()) {
+ join.right().getOutput().forEach(slot ->
+
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot));
+ }
collectPushDownCTEInfos(join, context);
if (!getPushDownCTECandidates(ctx).isEmpty()) {
pushDownRuntimeFilterIntoCTE(ctx);
@@ -142,29 +152,19 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
return producer;
}
- @Override
- public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<?
extends Plan, ? extends Plan> join,
- CascadesContext context) {
- // TODO: we need to support all type join
- join.right().accept(this, context);
- join.left().accept(this, context);
+ private void generateBitMapRuntimeFilterForNLJ(PhysicalNestedLoopJoin<?
extends Plan, ? extends Plan> join,
+ RuntimeFilterContext ctx) {
if (join.getJoinType() != JoinType.LEFT_SEMI_JOIN &&
join.getJoinType() != JoinType.CROSS_JOIN) {
- return join;
+ return;
}
- RuntimeFilterContext ctx = context.getRuntimeFilterContext();
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap =
ctx.getAliasTransferMap();
-
- if ((ctx.getSessionVariable().getRuntimeFilterType() &
TRuntimeFilterType.BITMAP.getValue()) == 0) {
- //only generate BITMAP filter for nested loop join
- return join;
- }
List<Slot> leftSlots = join.left().getOutput();
List<Slot> rightSlots = join.right().getOutput();
List<Expression> bitmapRuntimeFilterConditions =
JoinUtils.extractBitmapRuntimeFilterConditions(leftSlots,
rightSlots, join.getOtherJoinConjuncts());
if (!JoinUtils.extractExpressionForHashTable(leftSlots, rightSlots,
join.getOtherJoinConjuncts())
.first.isEmpty()) {
- return join;
+ return;
}
int bitmapRFCount = bitmapRuntimeFilterConditions.size();
for (int i = 0; i < bitmapRFCount; i++) {
@@ -193,6 +193,104 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
join.addBitmapRuntimeFilterCondition(bitmapRuntimeFilterCondition);
}
}
+ }
+
+ /**
+ * A join B on B.x < A.x
+ * transform B.x < A.x to A.x > B.x,
+ * otherwise return null
+ */
+ private ComparisonPredicate normalizeNonEqual(AbstractPhysicalJoin<?
extends Plan, ? extends Plan> join,
+ Expression expr) {
+ if (!(expr instanceof ComparisonPredicate)) {
+ return null;
+ }
+ if (!(expr.child(0) instanceof SlotReference)) {
+ return null;
+ }
+ if (!(expr.child(1) instanceof SlotReference)) {
+ return null;
+ }
+ if (! join.left().getOutput().contains(expr.child(0))
+ || ! join.right().getOutput().contains(expr.child(1))) {
+ if (join.left().getOutput().contains(expr.child(1))
+ && join.right().getOutput().contains(expr.child(0))) {
+ return ((ComparisonPredicate) expr).commute();
+ }
+ } else {
+ return (ComparisonPredicate) expr;
+ }
+ return null;
+ }
+
+ private TMinMaxRuntimeFilterType getMinMaxType(ComparisonPredicate
compare) {
+ if (compare instanceof LessThan || compare instanceof LessThanEqual) {
+ return TMinMaxRuntimeFilterType.MAX;
+ }
+ if (compare instanceof GreaterThan || compare instanceof
GreaterThanEqual) {
+ return TMinMaxRuntimeFilterType.MIN;
+ }
+ return TMinMaxRuntimeFilterType.MIN_MAX;
+ }
+
+ /**
+ * A join B on A.x < B.y
+ * min-max filter (A.x < N, N=max(B.y)) could be applied to A.x
+ */
+ private void generateMinMaxRuntimeFilter(AbstractPhysicalJoin<? extends
Plan, ? extends Plan> join,
+ RuntimeFilterContext ctx) {
+ Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap =
ctx.getAliasTransferMap();
+ int hashCondionSize = join.getHashJoinConjuncts().size();
+ for (int idx = 0; idx < join.getOtherJoinConjuncts().size(); idx++) {
+ int exprOrder = idx + hashCondionSize;
+ Expression expr = join.getOtherJoinConjuncts().get(exprOrder);
+ ComparisonPredicate compare = normalizeNonEqual(join, expr);
+ if (compare != null) {
+ Slot unwrappedSlot = checkTargetChild(compare.child(0));
+ if (unwrappedSlot == null) {
+ continue;
+ }
+ Pair<PhysicalRelation, Slot> pair =
aliasTransferMap.get(unwrappedSlot);
+ if (pair == null) {
+ continue;
+ }
+ Slot olapScanSlot = pair.second;
+ PhysicalRelation scan = pair.first;
+ Preconditions.checkState(olapScanSlot != null && scan != null);
+ long buildSideNdv = getBuildSideNdv(join, compare);
+ RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+ compare.child(1), ImmutableList.of(olapScanSlot),
ImmutableList.of(olapScanSlot),
+ TRuntimeFilterType.MIN_MAX, exprOrder, join, true,
buildSideNdv,
+ getMinMaxType(compare));
+ ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+ ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+ ctx.setTargetsOnScanNode(scan.getRelationId(), olapScanSlot);
+ }
+ }
+ }
+
+ @Override
+ public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<?
extends Plan, ? extends Plan> join,
+ CascadesContext context) {
+ // TODO: we need to support all type join
+ join.right().accept(this, context);
+ join.left().accept(this, context);
+
+ if
(RuntimeFilterGenerator.DENIED_JOIN_TYPES.contains(join.getJoinType()) ||
join.isMarkJoin()) {
+ join.right().getOutput().forEach(slot ->
+
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot));
+ return join;
+ }
+ RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+
+ if ((ctx.getSessionVariable().getRuntimeFilterType() &
TRuntimeFilterType.BITMAP.getValue()) != 0) {
+ generateBitMapRuntimeFilterForNLJ(join, ctx);
+ }
+
+ if ((ctx.getSessionVariable().getRuntimeFilterType() &
TRuntimeFilterType.MIN_MAX.getValue()) != 0) {
+ generateMinMaxRuntimeFilter(join, ctx);
+ }
+
return join;
}
@@ -233,14 +331,16 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
return relation;
}
- private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends
Plan> join, EqualTo equalTo) {
+ // runtime filter build side ndv
+ private long getBuildSideNdv(AbstractPhysicalJoin<? extends Plan, ?
extends Plan> join,
+ ComparisonPredicate compare) {
AbstractPlan right = (AbstractPlan) join.right();
//make ut test friendly
if (right.getStats() == null) {
return -1L;
}
ExpressionEstimation estimator = new ExpressionEstimation();
- ColumnStatistic buildColStats = equalTo.right().accept(estimator,
right.getStats());
+ ColumnStatistic buildColStats = compare.right().accept(estimator,
right.getStats());
return buildColStats.isUnKnown ? -1 : Math.max(1, (long)
buildColStats.ndv);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
index eb9ed7cfc97..ad7e8ba8cc8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
@@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Join;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.JoinUtils;
+import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.Statistics;
import com.google.common.collect.ImmutableList;
@@ -41,6 +42,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.stream.Collectors;
/**
* Abstract class for all physical join node.
@@ -214,4 +216,30 @@ public abstract class AbstractPhysicalJoin<
? ImmutableList.of(markJoinSlotReference.get()) :
ImmutableList.of())
.build();
}
+
+ @Override
+ public String toString() {
+ List<Object> args = Lists.newArrayList("type", joinType,
+ "hashCondition", hashJoinConjuncts,
+ "otherCondition", otherJoinConjuncts,
+ "stats", statistics);
+ if (markJoinSlotReference.isPresent()) {
+ args.add("isMarkJoin");
+ args.add("true");
+ }
+ if (markJoinSlotReference.isPresent()) {
+ args.add("MarkJoinSlotReference");
+ args.add(markJoinSlotReference.get());
+ }
+ if (hint != JoinHint.NONE) {
+ args.add("hint");
+ args.add(hint);
+ }
+ if (!runtimeFilters.isEmpty()) {
+ args.add("runtimeFilters");
+ args.add(runtimeFilters.stream().map(rf -> rf.toString() + "
").collect(Collectors.toList()));
+ }
+ return Utils.toSqlString(this.getClass().getName() + "[" + id.asInt()
+ "]" + getGroupIdWithPrefix(),
+ args.toArray());
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
index 6d9583504ea..b60afd67308 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
@@ -31,14 +31,12 @@ import
org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.MutableState;
-import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.Statistics;
@@ -140,33 +138,6 @@ public class PhysicalHashJoin<
return visitor.visitPhysicalHashJoin(this, context);
}
- @Override
- public String toString() {
- List<Object> args = Lists.newArrayList("type", joinType,
- "hashJoinCondition", hashJoinConjuncts,
- "otherJoinCondition", otherJoinConjuncts,
- "stats", statistics,
- "fr", getMutableState(AbstractPlan.FRAGMENT_ID));
- if (markJoinSlotReference.isPresent()) {
- args.add("isMarkJoin");
- args.add("true");
- }
- if (markJoinSlotReference.isPresent()) {
- args.add("MarkJoinSlotReference");
- args.add(markJoinSlotReference.get());
- }
- if (hint != JoinHint.NONE) {
- args.add("hint");
- args.add(hint);
- }
- if (!runtimeFilters.isEmpty()) {
- args.add("runtimeFilters");
- args.add(runtimeFilters.stream().map(rf -> rf.toString() + "
").collect(Collectors.toList()));
- }
- return Utils.toSqlString("PhysicalHashJoin[" + id.asInt() + "]" +
getGroupIdWithPrefix(),
- args.toArray());
- }
-
@Override
public PhysicalHashJoin<Plan, Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 2);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
index d6b08d02c54..d2dca0d2e73 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
@@ -28,7 +28,6 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.MutableState;
-import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.Statistics;
import com.google.common.base.Preconditions;
@@ -113,17 +112,17 @@ public class PhysicalNestedLoopJoin<
return visitor.visitPhysicalNestedLoopJoin(this, context);
}
- @Override
- public String toString() {
- // TODO: Maybe we could pull up this to the abstract class in the
future.
- return Utils.toSqlString("PhysicalNestedLoopJoin[" + id.asInt() + "]"
+ getGroupIdWithPrefix(),
- "type", joinType,
- "otherJoinCondition", otherJoinConjuncts,
- "isMarkJoin", markJoinSlotReference.isPresent(),
- "markJoinSlotReference", markJoinSlotReference.isPresent() ?
markJoinSlotReference.get() : "empty",
- "stats", statistics
- );
- }
+ // @Override
+ // public String toString() {
+ // // TODO: Maybe we could pull up this to the abstract class in the
future.
+ // return Utils.toSqlString("PhysicalNestedLoopJoin[" + id.asInt() +
"]" + getGroupIdWithPrefix(),
+ // "type", joinType,
+ // "otherJoinCondition", otherJoinConjuncts,
+ // "isMarkJoin", markJoinSlotReference.isPresent(),
+ // "markJoinSlotReference", markJoinSlotReference.isPresent()
? markJoinSlotReference.get() : "empty",
+ // "stats", statistics
+ // );
+ // }
@Override
public PhysicalNestedLoopJoin<Plan, Plan> withChildren(List<Plan>
children) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
index 74a100dc237..f213a8996ce 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.thrift.TMinMaxRuntimeFilterType;
import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.collect.ImmutableList;
@@ -45,21 +46,31 @@ public class RuntimeFilter {
private final boolean bitmapFilterNotIn;
private final long buildSideNdv;
+ // use for min-max filter only. specify if the min or max side is valid
+ private final TMinMaxRuntimeFilterType tMinMaxType;
/**
* constructor
*/
public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot>
targets, TRuntimeFilterType type,
int exprOrder, AbstractPhysicalJoin builderNode, long
buildSideNdv) {
- this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder,
builderNode, false, buildSideNdv);
+ this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder,
+ builderNode, false, buildSideNdv,
TMinMaxRuntimeFilterType.MIN_MAX);
+ }
+
+ public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot>
targets, List<Expression> targetExpressions,
+ TRuntimeFilterType type, int exprOrder,
AbstractPhysicalJoin builderNode,
+ boolean bitmapFilterNotIn, long buildSideNdv) {
+ this(id, src, targets, targetExpressions, type, exprOrder,
+ builderNode, bitmapFilterNotIn, buildSideNdv,
TMinMaxRuntimeFilterType.MIN_MAX);
}
/**
* constructor
*/
public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot>
targets, List<Expression> targetExpressions,
- TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin
builderNode, boolean bitmapFilterNotIn,
- long buildSideNdv) {
+ TRuntimeFilterType type, int exprOrder,
AbstractPhysicalJoin builderNode,
+ boolean bitmapFilterNotIn, long buildSideNdv,
TMinMaxRuntimeFilterType tMinMaxType) {
this.id = id;
this.srcSlot = src;
this.targetSlots = Lists.newArrayList(targets);
@@ -69,9 +80,14 @@ public class RuntimeFilter {
this.builderNode = builderNode;
this.bitmapFilterNotIn = bitmapFilterNotIn;
this.buildSideNdv = buildSideNdv <= 0 ? -1L : buildSideNdv;
+ this.tMinMaxType = tMinMaxType;
builderNode.addRuntimeFilter(this);
}
+ public TMinMaxRuntimeFilterType gettMinMaxType() {
+ return tMinMaxType;
+ }
+
public Expression getSrcExpr() {
return srcSlot;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
index 8f5ce78304b..4d4a2c641a9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
@@ -150,24 +150,7 @@ public class DataStreamSink extends DataSink {
}
List<String> filtersStr = new ArrayList<>();
for (RuntimeFilter filter : runtimeFilters) {
- StringBuilder filterStr = new StringBuilder();
- filterStr.append(filter.getFilterId());
- if (!isBrief) {
- filterStr.append("[");
- filterStr.append(filter.getType().toString().toLowerCase());
- filterStr.append("]");
- if (isBuildNode) {
- filterStr.append(" <- ");
- filterStr.append(filter.getSrcExpr().toSql());
-
filterStr.append("(").append(filter.getEstimateNdv()).append("/")
-
.append(filter.getExpectFilterSizeBytes()).append("/")
- .append(filter.getFilterSizeBytes()).append(")");
- } else {
- filterStr.append(" -> ");
-
filterStr.append(filter.getTargetExpr(getExchNodeId()).toSql());
- }
- }
- filtersStr.add(filterStr.toString());
+ filtersStr.add(filter.getExplainString(isBuildNode, isBrief,
getExchNodeId()));
}
return Joiner.on(", ").join(filtersStr) + "\n";
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 40cfe876b89..754b9fcfcb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -1116,24 +1116,7 @@ public abstract class PlanNode extends
TreeNode<PlanNode> implements PlanStats {
}
List<String> filtersStr = new ArrayList<>();
for (RuntimeFilter filter : runtimeFilters) {
- StringBuilder filterStr = new StringBuilder();
- filterStr.append(filter.getFilterId());
- if (!isBrief) {
- filterStr.append("[");
- filterStr.append(filter.getType().toString().toLowerCase());
- filterStr.append("]");
- if (isBuildNode) {
- filterStr.append(" <- ");
- filterStr.append(filter.getSrcExpr().toSql());
-
filterStr.append("(").append(filter.getEstimateNdv()).append("/")
-
.append(filter.getExpectFilterSizeBytes()).append("/")
- .append(filter.getFilterSizeBytes()).append(")");
- } else {
- filterStr.append(" -> ");
- filterStr.append(filter.getTargetExpr(getId()).toSql());
- }
- }
- filtersStr.add(filterStr.toString());
+ filtersStr.add(filter.getExplainString(isBuildNode, isBrief,
getId()));
}
return Joiner.on(", ").join(filtersStr) + "\n";
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index c6781e7ed4c..d21f390c04d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -35,6 +35,7 @@ import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TMinMaxRuntimeFilterType;
import org.apache.doris.thrift.TRuntimeFilterDesc;
import org.apache.doris.thrift.TRuntimeFilterType;
@@ -109,6 +110,8 @@ public final class RuntimeFilter {
private boolean useRemoteRfOpt = true;
+ private TMinMaxRuntimeFilterType tMinMaxRuntimeFilterType;
+
/**
* Internal representation of a runtime filter target.
*/
@@ -142,8 +145,10 @@ public final class RuntimeFilter {
}
private RuntimeFilter(RuntimeFilterId filterId, PlanNode filterSrcNode,
Expr srcExpr, int exprOrder,
- List<Expr> origTargetExprs, List<Map<TupleId, List<SlotId>>>
targetSlots, TRuntimeFilterType type,
- RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long
buildSizeNdv) {
+ List<Expr> origTargetExprs, List<Map<TupleId,
List<SlotId>>> targetSlots,
+ TRuntimeFilterType type,
+ RuntimeFilterGenerator.FilterSizeLimits
filterSizeLimits, long buildSizeNdv,
+ TMinMaxRuntimeFilterType tMinMaxRuntimeFilterType) {
this.id = filterId;
this.builderNode = filterSrcNode;
this.srcExpr = srcExpr;
@@ -152,16 +157,27 @@ public final class RuntimeFilter {
this.targetSlotsByTid = ImmutableList.copyOf(targetSlots);
this.runtimeFilterType = type;
this.ndvEstimate = buildSizeNdv;
+ this.tMinMaxRuntimeFilterType = tMinMaxRuntimeFilterType;
computeNdvEstimate();
calculateFilterSize(filterSizeLimits);
}
+ private RuntimeFilter(RuntimeFilterId filterId, PlanNode filterSrcNode,
Expr srcExpr, int exprOrder,
+ List<Expr> origTargetExprs, List<Map<TupleId, List<SlotId>>>
targetSlots, TRuntimeFilterType type,
+ RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long
buildSizeNdv) {
+ this(filterId, filterSrcNode, srcExpr, exprOrder, origTargetExprs,
+ targetSlots, type, filterSizeLimits, buildSizeNdv,
TMinMaxRuntimeFilterType.MIN_MAX);
+ }
+
// only for nereids planner
- public static RuntimeFilter fromNereidsRuntimeFilter(RuntimeFilterId id,
JoinNodeBase node, Expr srcExpr,
- int exprOrder, List<Expr> origTargetExprs, List<Map<TupleId,
List<SlotId>>> targetSlots,
- TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits
filterSizeLimits, long buildSizeNdv) {
- return new RuntimeFilter(id, node, srcExpr, exprOrder, origTargetExprs,
- targetSlots, type, filterSizeLimits, buildSizeNdv);
+ public static RuntimeFilter fromNereidsRuntimeFilter(
+ org.apache.doris.nereids.trees.plans.physical.RuntimeFilter
nereidsFilter,
+ JoinNodeBase node, Expr srcExpr, List<Expr> origTargetExprs,
+ List<Map<TupleId, List<SlotId>>> targetSlots,
+ RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) {
+ return new RuntimeFilter(nereidsFilter.getId(), node, srcExpr,
nereidsFilter.getExprOrder(), origTargetExprs,
+ targetSlots, nereidsFilter.getType(), filterSizeLimits,
nereidsFilter.getBuildSideNdv(),
+ nereidsFilter.gettMinMaxType());
}
@Override
@@ -224,6 +240,9 @@ public final class RuntimeFilter {
tFilter.setBitmapTargetExpr(targets.get(0).expr.treeToThrift());
tFilter.setBitmapFilterNotIn(bitmapFilterNotIn);
}
+ if (runtimeFilterType.equals(TRuntimeFilterType.MIN_MAX)) {
+ tFilter.setMinMaxType(tMinMaxRuntimeFilterType);
+ }
tFilter.setOptRemoteRf(optRemoteRf);
return tFilter;
}
@@ -256,6 +275,18 @@ public final class RuntimeFilter {
return runtimeFilterType;
}
+ public String getTypeDesc() {
+ String desc = runtimeFilterType.toString().toLowerCase();
+ if (runtimeFilterType == TRuntimeFilterType.MIN_MAX) {
+ if (tMinMaxRuntimeFilterType == TMinMaxRuntimeFilterType.MIN) {
+ desc = "min";
+ } else if (tMinMaxRuntimeFilterType ==
TMinMaxRuntimeFilterType.MAX) {
+ desc = "max";
+ }
+ }
+ return desc;
+ }
+
public void setType(TRuntimeFilterType type) {
runtimeFilterType = type;
}
@@ -685,4 +716,25 @@ public final class RuntimeFilter {
public long getExpectFilterSizeBytes() {
return expectFilterSizeBytes;
}
+
+ public String getExplainString(boolean isBuildNode, boolean isBrief,
PlanNodeId targetNodeId) {
+ StringBuilder filterStr = new StringBuilder();
+ filterStr.append(getFilterId());
+ if (!isBrief) {
+ filterStr.append("[");
+ filterStr.append(getTypeDesc());
+ filterStr.append("]");
+ if (isBuildNode) {
+ filterStr.append(" <- ");
+ filterStr.append(getSrcExpr().toSql());
+ filterStr.append("(").append(getEstimateNdv()).append("/")
+ .append(getExpectFilterSizeBytes()).append("/")
+ .append(getFilterSizeBytes()).append(")");
+ } else {
+ filterStr.append(" -> ");
+ filterStr.append(getTargetExpr(targetNodeId).toSql());
+ }
+ }
+ return filterStr.toString();
+ }
}
diff --git a/regression-test/suites/correctness_p0/test_runtime_filter.groovy
b/regression-test/suites/correctness_p0/test_runtime_filter.groovy
new file mode 100644
index 00000000000..f691bd16060
--- /dev/null
+++ b/regression-test/suites/correctness_p0/test_runtime_filter.groovy
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// The cases is copied from https://github.com/trinodb/trino/tree/master
+//
/testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate
+// and modified by Doris.
+
+suite("test_runtime_filter") {
+
+ sql """ DROP TABLE IF EXISTS rf_tblA """
+ sql """
+ CREATE TABLE IF NOT EXISTS rf_tblA (
+ a int
+ )
+ DUPLICATE KEY(a)
+ DISTRIBUTED BY HASH(a) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ sql """ DROP TABLE IF EXISTS rf_tblB """
+ sql """
+ CREATE TABLE IF NOT EXISTS rf_tblB (
+ b int
+ )
+ DUPLICATE KEY(b)
+ DISTRIBUTED BY HASH(b) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS rf_tblC (
+ c int
+ )
+ DUPLICATE KEY(c)
+ DISTRIBUTED BY HASH(c) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ sql "set enable_pipeline_engine=true;"
+ sql "set runtime_filter_type=4"
+ sql "set enable_nereids_planner=true"
+ sql "set enable_fallback_to_original_planner=false"
+ sql "set disable_join_reorder=true"
+
+ explain{
+ sql ("""select * from rf_tblA join rf_tblB on a < b""")
+ contains "runtime filters: RF000[max] -> a"
+ contains "runtime filters: RF000[max] <- b"
+ }
+
+ explain{
+ sql ("""select * from rf_tblA join rf_tblB on a > b""")
+ contains "runtime filters: RF000[min] -> a"
+ contains "runtime filters: RF000[min] <- b"
+ }
+
+ explain{
+ sql ("""select * from rf_tblA join rf_tblB on b < a""")
+ contains "runtime filters: RF000[min] -> a"
+ contains "runtime filters: RF000[min] <- b"
+ }
+
+ explain{
+ sql ("""select * from rf_tblA right outer join rf_tblB on a < b""")
+ contains "runtime filters: RF000[max] <- b"
+ contains "runtime filters: RF000[max] -> a"
+ }
+
+ explain{
+ sql ("""select * from rf_tblA left join rf_tblB on a < b; """)
+ notContains "runtime filters"
+ }
+
+ explain{
+ sql ("""select * from rf_tblA full outer join rf_tblB on a = b; """)
+ notContains "runtime filters"
+ }
+
+ explain{
+ sql ("""
+ with x as (select * from rf_tblA join rf_tblB on a=b)
+ select * from x join rf_tblC on x.b <= rf_tblC.c
+ union
+ select * from x join rf_tblC on x.b <= rf_tblC.c
+ """)
+ contains "runtime filters: RF001[max] -> b"
+ contains "runtime filters: RF002[max] -> b"
+ contains "runtime filters: RF001[max] <- c"
+ contains "runtime filters: RF002[max] <- c"
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]