This is an automated email from the ASF dual-hosted git repository.
lingmiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1378e7e05f (Refactor)[Planner] Remove merge node (#9251)
1378e7e05f is described below
commit 1378e7e05ff5849976705e70437fd05e2b8fda84
Author: EmmyMiao87 <[email protected]>
AuthorDate: Thu Apr 28 15:05:35 2022 +0800
(Refactor)[Planner] Remove merge node (#9251)
---
.../apache/doris/planner/DistributedPlanner.java | 63 ------
.../java/org/apache/doris/planner/MergeNode.java | 224 ---------------------
2 files changed, 287 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 215790e7d1..4ee3106057 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -213,8 +213,6 @@ public class DistributedPlanner {
result = createSelectNodeFragment((SelectNode) root,
childFragments);
} else if (root instanceof SetOperationNode) {
result = createSetOperationNodeFragment((SetOperationNode) root,
childFragments, fragments);
- } else if (root instanceof MergeNode) {
- result = createMergeNodeFragment((MergeNode) root, childFragments,
fragments);
} else if (root instanceof AggregationNode) {
result = createAggregationFragment((AggregationNode) root,
childFragments.get(0), fragments);
} else if (root instanceof SortNode) {
@@ -692,67 +690,6 @@ public class DistributedPlanner {
return leftChildFragment;
}
- /**
- * Creates an unpartitioned fragment that merges the outputs of all of its
children (with a single ExchangeNode),
- * corresponding to the 'mergeNode' of the non-distributed plan. Each of
the child fragments receives a MergeNode as
- * a new plan root (with the child fragment's plan tree as its only
input), so that each child fragment's output is
- * mapped onto the MergeNode's result tuple id. TODO: if this is
implementing a UNION DISTINCT, the parent of the
- * mergeNode is a duplicate-removing AggregationNode, which might make
sense to apply to the children as well, in
- * order to reduce the amount of data that needs to be sent to the parent;
augment the planner to decide whether
- * that would reduce the runtime. TODO: since the fragment that does the
merge is unpartitioned, it can absorb all
- * child fragments that are also unpartitioned
- */
- private PlanFragment createMergeNodeFragment(MergeNode mergeNode,
- ArrayList<PlanFragment>
childFragments,
- ArrayList<PlanFragment>
fragments)
- throws UserException {
- Preconditions.checkState(mergeNode.getChildren().size() ==
childFragments.size());
-
- // If the mergeNode only has constant exprs, return it in an
unpartitioned fragment.
- if (mergeNode.getChildren().isEmpty()) {
- Preconditions.checkState(!mergeNode.getConstExprLists().isEmpty());
- return new PlanFragment(ctx_.getNextFragmentId(), mergeNode,
DataPartition.UNPARTITIONED);
- }
-
- // create an ExchangeNode to perform the merge operation of mergeNode;
- // the ExchangeNode retains the generic PlanNode parameters of
mergeNode
- ExchangeNode exchNode = new ExchangeNode(ctx_.getNextNodeId(),
mergeNode, true);
- exchNode.setNumInstances(1);
- exchNode.init(ctx_.getRootAnalyzer());
- PlanFragment parentFragment =
- new PlanFragment(ctx_.getNextFragmentId(), exchNode,
DataPartition.UNPARTITIONED);
-
- // we don't expect to be paralleling a MergeNode that was inserted
solely
- // to evaluate conjuncts (ie, that doesn't explicitly materialize its
output)
- Preconditions.checkState(mergeNode.getTupleIds().size() == 1);
-
- for (int i = 0; i < childFragments.size(); ++i) {
- PlanFragment childFragment = childFragments.get(i);
- // create a clone of mergeNode; we want to keep the limit and
conjuncts
- MergeNode childMergeNode = new MergeNode(ctx_.getNextNodeId(),
mergeNode);
- List<Expr> resultExprs =
Expr.cloneList(mergeNode.getResultExprLists().get(i), null);
- childMergeNode.addChild(childFragment.getPlanRoot(), resultExprs);
- childFragment.setPlanRoot(childMergeNode);
- childFragment.setDestination(exchNode);
- }
-
- // Add an unpartitioned child fragment with a MergeNode for the
constant exprs.
- if (!mergeNode.getConstExprLists().isEmpty()) {
- MergeNode childMergeNode = new MergeNode(ctx_.getNextNodeId(),
mergeNode);
- childMergeNode.init(ctx_.getRootAnalyzer());
-
childMergeNode.getConstExprLists().addAll(mergeNode.getConstExprLists());
- // Clear original constant exprs to make sure nobody else picks
them up.
- mergeNode.getConstExprLists().clear();
- PlanFragment childFragment =
- new PlanFragment(ctx_.getNextFragmentId(), childMergeNode,
DataPartition.UNPARTITIONED);
- childFragment.setPlanRoot(childMergeNode);
- childFragment.setDestination(exchNode);
- childFragments.add(childFragment);
- fragments.add(childFragment);
- }
- return parentFragment;
- }
-
/**
* Returns a new fragment with a UnionNode as its root. The data partition
of the
* returned fragment and how the data of the child fragments is consumed
depends on the
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MergeNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/MergeNode.java
deleted file mode 100644
index bb307021ef..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/MergeNode.java
+++ /dev/null
@@ -1,224 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner;
-
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.SlotDescriptor;
-import org.apache.doris.analysis.SlotId;
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.analysis.TupleId;
-import org.apache.doris.common.UserException;
-import org.apache.doris.thrift.TExplainLevel;
-import org.apache.doris.thrift.TExpr;
-import org.apache.doris.thrift.TMergeNode;
-import org.apache.doris.thrift.TPlanNode;
-import org.apache.doris.thrift.TPlanNodeType;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.LogManager;
-
-import java.util.List;
-
-/**
- * Node that merges the results of its child plans by materializing
- * the corresponding result exprs.
- * If no result exprs are specified for a child, it simply passes on the
child's
- * results.
- */
-public class MergeNode extends PlanNode {
- private final static Logger LOG = LogManager.getLogger(MergeNode.class);
-
- // Expr lists corresponding to the input query stmts.
- // The ith resultExprList belongs to the ith child.
- // All exprs are resolved to base tables.
- protected List<List<Expr>> resultExprLists = Lists.newArrayList();
-
- // Expr lists that originate from constant select stmts.
- // We keep them separate from the regular expr lists to avoid null
children.
- protected List<List<Expr>> constExprLists = Lists.newArrayList();
-
- // Output tuple materialized by this node.
- protected final List<TupleDescriptor> tupleDescs = Lists.newArrayList();
-
- protected final TupleId tupleId;
-
- protected MergeNode(PlanNodeId id, MergeNode node) {
- super(id, node, "MERGE");
- this.tupleId = node.tupleId;
- }
-
- public void addConstExprList(List<Expr> exprs) {
- constExprLists.add(exprs);
- }
-
- public void addChild(PlanNode node, List<Expr> resultExprs) {
- addChild(node);
- resultExprLists.add(resultExprs);
- if (resultExprs != null) {
- // if we're materializing output, we can only do that into a single
- // output tuple
- Preconditions.checkState(tupleIds.size() == 1);
- }
- }
-
- /**
- * This must be called *after* addChild()/addConstExprList() because it
recomputes
- * both of them.
- * The MergeNode doesn't need an smap: like a ScanNode, it materializes an
"original"
- * tuple id
- */
- @Override
- public void init(Analyzer analyzer) throws UserException {
- assignConjuncts(analyzer);
- //computeMemLayout(analyzer);
- computeStats(analyzer);
- Preconditions.checkState(resultExprLists.size() ==
getChildren().size());
-
- // drop resultExprs/constExprs that aren't getting materialized (=
where the
- // corresponding output slot isn't being materialized)
- List<SlotDescriptor> slots =
analyzer.getDescTbl().getTupleDesc(tupleId).getSlots();
- List<List<Expr>> newResultExprLists = Lists.newArrayList();
- //
- // for (int i = 0; i < resultExprLists.size(); ++i) {
- // List<Expr> exprList = resultExprLists.get(i);
- // List<Expr> newExprList = Lists.newArrayList();
- // for (int j = 0; j < exprList.size(); ++j) {
- // if (slots.get(j).isMaterialized())
newExprList.add(exprList.get(j));
- // }
- // newResultExprLists.add(newExprList);
- // newResultExprLists.add(
- // Expr.substituteList(newExprList,
getChild(i).getOutputSmap(), analyzer, true));
- // }
- // resultExprLists = newResultExprLists;
- //
- Preconditions.checkState(resultExprLists.size() ==
getChildren().size());
-
- List<List<Expr>> newConstExprLists = Lists.newArrayList();
- for (List<Expr> exprList: constExprLists) {
- List<Expr> newExprList = Lists.newArrayList();
- for (int i = 0; i < exprList.size(); ++i) {
- if (slots.get(i).isMaterialized())
newExprList.add(exprList.get(i));
- }
- newConstExprLists.add(newExprList);
- }
- constExprLists = newConstExprLists;
- }
-
- @Override
- public void computeStats(Analyzer analyzer) {
- super.computeStats(analyzer);
- if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
- return;
- }
- cardinality = constExprLists.size();
- for (PlanNode child : children) {
- // ignore missing child cardinality info in the hope it won't
matter enough
- // to change the planning outcome
- if (child.cardinality > 0) {
- cardinality += child.cardinality;
- }
- }
- applyConjunctsSelectivity();
- capCardinalityAtLimit();
- if (LOG.isDebugEnabled()) {
- LOG.debug("stats Merge: cardinality={}",
Long.toString(cardinality));
- }
- }
-
- @Override
- protected void computeOldCardinality() {
- cardinality = constExprLists.size();
- for (PlanNode child : children) {
- // ignore missing child cardinality info in the hope it won't
matter enough
- // to change the planning outcome
- if (child.cardinality > 0) {
- cardinality += child.cardinality;
- }
- }
- LOG.debug("stats Merge: cardinality={}", Long.toString(cardinality));
- }
-
- public List<List<Expr>> getResultExprLists() {
- return resultExprLists;
- }
-
- public List<List<Expr>> getConstExprLists() {
- return constExprLists;
- }
-
- @Override
- protected void toThrift(TPlanNode msg) {
- List<List<TExpr>> texprLists = Lists.newArrayList();
- List<List<TExpr>> constTexprLists = Lists.newArrayList();
- for (List<Expr> exprList : resultExprLists) {
- if (exprList != null) {
- texprLists.add(Expr.treesToThrift(exprList));
- }
- }
- for (List<Expr> constTexprList : constExprLists) {
- constTexprLists.add(Expr.treesToThrift(constTexprList));
- }
- msg.merge_node = new TMergeNode(tupleId.asInt(), texprLists,
constTexprLists);
- msg.node_type = TPlanNodeType.MERGE_NODE;
- }
-
- @Override
- public String getNodeExplainString(String prefix, TExplainLevel
detailLevel) {
- if (detailLevel == TExplainLevel.BRIEF) {
- return "";
- }
- StringBuilder output = new StringBuilder();
- // A MergeNode may have predicates if a union is used inside an inline
view,
- // and the enclosing select stmt has predicates referring to the
inline view.
- if (!conjuncts.isEmpty()) {
- output.append(prefix + "predicates: " +
getExplainString(conjuncts) + "\n");
- }
- if (constExprLists.size() > 0) {
- output.append(prefix + "merging " + constExprLists.size() + "
SELECT CONSTANT\n");
- }
- return output.toString();
- }
-
- @Override
- public void getMaterializedIds(Analyzer analyzer, List<SlotId> ids) {
- super.getMaterializedIds(analyzer, ids);
-
- for (List<Expr> resultExprs : resultExprLists) {
- Expr.getIds(resultExprs, null, ids);
- }
-
- // for now, also mark all of our output slots as materialized
- // TODO: fix this, it's not really necessary, but it breaks the logic
- // in MergeNode (c++)
- for (TupleId tupleId : tupleIds) {
- TupleDescriptor tupleDesc = analyzer.getTupleDesc(tupleId);
- for (SlotDescriptor slotDesc : tupleDesc.getSlots()) {
- ids.add(slotDesc.getId());
- }
- }
- }
-
- @Override
- public int getNumInstances() {
- return 1;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]