chenhao7253886 commented on a change in pull request #1471: Planner support
push down predicates past agg, win and sort
URL: https://github.com/apache/incubator-doris/pull/1471#discussion_r315050331
##########
File path: fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
##########
@@ -1545,5 +1625,192 @@ private void
materializeInlineViewResultExprForCrossJoinOrCountStar(InlineViewRe
}
}
+ /**
+
------------------------------------------------------------------------------
+ */
+ /**
+ * Push down predicates rules
+ */
+
+ /**
+ * Entrance for push-down rules, it will execute possible push-down rules
from top to down
+ * and the planner will be responsible for assigning all predicates to
PlanNode.
+ */
+ private void pushDownPredicates(Analyzer analyzer, SelectStmt stmt) throws
AnalysisException {
+ pushDownPredicatesPastSort(analyzer, stmt);
+ pushDownPredicatesPastWindows(analyzer, stmt);
+ pushDownPredicatesPastAggregation(analyzer, stmt);
+ }
+
+ private void pushDownPredicatesPastSort(Analyzer analyzer, SelectStmt
stmt) throws AnalysisException {
+ // TODO chenhao, remove isEvaluateOrderBy when SubQuery's default
limit is removed.
+ if (stmt.evaluateOrderBy() || stmt.getLimit() >= 0 || stmt.getOffset()
> 0 || stmt.getSortInfo() == null) {
+ return;
+ }
+
+ final List<Expr> predicates = getBoundPredicates(analyzer,
stmt.getSortInfo().getSortTupleDescriptor());
+ if (predicates.size() <= 0) {
+ return;
+ }
+
+ final List<Expr> pushDownPredicates =
getPredicatesReplacedSlotWithSourceExpr(predicates, analyzer);
+ if (pushDownPredicates.size() <= 0) {
+ return;
+ }
+
+ if (putPredicatesOnWindows(stmt, analyzer, pushDownPredicates)) {
+ return;
+ }
+ if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) {
+ return;
+ }
+
+ putPredicatesOnFrom(stmt, analyzer, pushDownPredicates);
+ }
+
+ private void pushDownPredicatesPastWindows(Analyzer analyzer, SelectStmt
stmt) throws AnalysisException {
+ final AnalyticInfo analyticInfo = stmt.getAnalyticInfo();
+ if (analyticInfo == null ||
analyticInfo.getCommonPartitionExprs().size() == 0) {
+ return;
+ }
+
+ final List<Expr> predicates = getBoundPredicates(analyzer,
analyticInfo.getOutputTupleDesc());
+ if (predicates.size() <= 0) {
+ return;
+ }
+
+ final List<Expr> pushDownPredicates =
getPredicatesBoundedByGroupbysAndReplaceSlotWithSourceExpr(predicates,
analyzer);
+ if (pushDownPredicates.size() <= 0) {
+ return;
+ }
+
+ if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) {
+ return;
+ }
+
+ putPredicatesOnFrom(stmt, analyzer, pushDownPredicates);
+ }
+
+ private void pushDownPredicatesPastAggregation(Analyzer analyzer,
SelectStmt stmt) throws AnalysisException {
+ final AggregateInfo aggregateInfo = stmt.getAggInfo();
+ if (aggregateInfo == null || aggregateInfo.getGroupingExprs().size()
<= 0) {
+ return;
+ }
+
+ final List<Expr> predicates = getBoundPredicates(analyzer,
aggregateInfo.getOutputTupleDesc());
+ if (predicates.size() <= 0) {
+ return;
+ }
+
+ final List<Expr> pushDownPredicates =
getPredicatesBoundedByGroupbysAndReplaceSlotWithSourceExpr(predicates,
analyzer);
+ if (pushDownPredicates.size() <= 0) {
+ return;
+ }
+
+ putPredicatesOnFrom(stmt, analyzer, pushDownPredicates);
+ }
+
+ private List<Expr>
getPredicatesBoundedByGroupbysAndReplaceSlotWithSourceExpr(List<Expr>
predicates, Analyzer analyzer) {
+ final List<Expr> predicatesCanPushDown = Lists.newArrayList();
+ for (Expr predicate : predicates) {
+ if (predicate.isConstant()) {
+ // Constant predicates can't be pushed down past Groupby.
+ continue;
+ }
+
+ final List<TupleId> tupleIds = Lists.newArrayList();
+ final List<SlotId> slotIds = Lists.newArrayList();
+ predicate.getIds(tupleIds, slotIds);
+
+ boolean isAllSlotReferingGroupBys = true;
+ for (SlotId slotId : slotIds) {
+ final SlotDescriptor slotDesc =
analyzer.getDescTbl().getSlotDesc(slotId);
+ if (slotDesc.getSourceExprs().get(0).getFn() instanceof
AggregateFunction) {
+ isAllSlotReferingGroupBys = false;
+ }
+ }
+
+ if (isAllSlotReferingGroupBys) {
+ predicatesCanPushDown.add(predicate);
+ }
+ }
+ return getPredicatesReplacedSlotWithSourceExpr(predicatesCanPushDown,
analyzer);
+ }
+
+ private List<Expr> getPredicatesReplacedSlotWithSourceExpr(List<Expr>
predicates, Analyzer analyzer) {
+ final List<Expr> predicatesCanPushDown = Lists.newArrayList();
+ analyzer.markConjunctsAssigned(predicates);
+ for (Expr predicate : predicates) {
+ final Expr newPredicate = predicate.clone();
+ replacePredicateSlotRefWithSlotRefSource(newPredicate, analyzer);
+ predicatesCanPushDown.add(newPredicate);
+ }
+ return predicatesCanPushDown;
+ }
+
+ private void replacePredicateSlotRefWithSlotRefSource(Expr predicate,
Analyzer analyzer) {
+ replacePredicateSlotRefWithSlotRefSource(null, predicate, -1,
analyzer);
+ }
+
+ private void replacePredicateSlotRefWithSlotRefSource(Expr parent, Expr
predicate, int childIndex, Analyzer analyzer) {
+ if (predicate instanceof SlotRef) {
+ final SlotRef slotRef = (SlotRef)predicate;
+ if (parent != null && childIndex >= 0) {
+ final Expr newReplacedExpr =
slotRef.getDesc().getSourceExprs().get(0).clone();
+ parent.setChild(childIndex, newReplacedExpr);
+ }
+ }
+
+ for (int i = 0; i < predicate.getChildren().size(); i++) {
+ final Expr child = predicate.getChild(i);
+ replacePredicateSlotRefWithSlotRefSource(predicate, child, i,
analyzer);
+ }
+ }
+
+ private boolean putPredicatesOnAggregation(SelectStmt stmt, Analyzer
analyzer,
+ List<Expr> predicates) throws
AnalysisException {
+ final AggregateInfo aggregateInfo = stmt.getAggInfo();
+ if (aggregateInfo != null) {
+ analyzer.registerConjuncts(predicates,
aggregateInfo.getOutputTupleId());
+ return true;
+ }
+ return false;
+ }
+
+ private boolean putPredicatesOnWindows(SelectStmt stmt, Analyzer analyzer,
+ List<Expr> predicates) throws
AnalysisException {
+ final AnalyticInfo analyticInfo = stmt.getAnalyticInfo();
+ if (analyticInfo != null) {
+ analyzer.registerConjuncts(predicates,
analyticInfo.getOutputTupleId());
+ return true;
+ }
+ return false;
+ }
+
+ private boolean putPredicatesOnFrom(SelectStmt stmt, Analyzer analyzer,
List<Expr> predicates) throws AnalysisException {
Review comment:
Ok, i will remove it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]