This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/rel-1.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 81f30668ef71278a20c760ed688542b9982a4009 Author: Beyyes <[email protected]> AuthorDate: Tue Sep 26 18:03:53 2023 +0800 Fix the dead lock bug when the generated distributed query plan contains cycle --- .../distribution/DistributionPlanContext.java | 5 +- .../planner/distribution/DistributionPlanner.java | 6 +- .../planner/distribution/ExchangeNodeAdder.java | 72 ++++++++++---------- .../planner/distribution/NodeGroupContext.java | 18 +++-- .../plan/planner/distribution/SourceRewriter.java | 76 ++++++++++------------ .../node/process/last/LastQueryTransformNode.java | 7 ++ .../planner/plan/node/sink/IdentitySinkNode.java | 5 ++ .../plan/node/source/AlignedSeriesScanNode.java | 4 +- 8 files changed, 101 insertions(+), 92 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java index 87efdf508b7..ce6b25c26dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java @@ -41,7 +41,6 @@ public class DistributionPlanContext { protected DistributionPlanContext(MPPQueryContext queryContext) { this.isRoot = true; this.queryContext = queryContext; - this.forceAddParent = false; } protected DistributionPlanContext copy() { @@ -53,8 +52,8 @@ public class DistributionPlanContext { return this; } - protected void setForceAddParent(boolean forceAddParent) { - this.forceAddParent = forceAddParent; + protected void setForceAddParent() { + this.forceAddParent = true; } public void setOneSeriesInMultiRegion(boolean oneSeriesInMultiRegion) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java index 087ce5338eb..f0e016a5e52 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java @@ -81,11 +81,7 @@ public class DistributionPlanner { public PlanNode addExchangeNode(PlanNode root) { ExchangeNodeAdder adder = new ExchangeNodeAdder(this.analysis); NodeGroupContext nodeGroupContext = - new NodeGroupContext( - context, - analysis.getStatement() instanceof QueryStatement - && (((QueryStatement) analysis.getStatement()).isAlignByDevice()), - root); + new NodeGroupContext(context, analysis.getStatement(), root); PlanNode newRoot = adder.visit(root, nodeGroupContext); adjustUpStream(newRoot, nodeGroupContext); return newRoot; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java index 6f2cce9a83f..779f101a52b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java @@ -59,8 +59,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggre import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -107,11 +105,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { private PlanNode internalVisitSchemaMerge( AbstractSchemaMergeNode node, NodeGroupContext context) { - node.getChildren() - .forEach( - child -> { - visit(child, context); - }); + node.getChildren().forEach(child -> visit(child, context)); NodeDistribution nodeDistribution = new NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN); PlanNode newNode = node.clone(); @@ -287,42 +281,38 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { } MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone(); - List<PlanNode> visitedChildren = new ArrayList<>(); - node.getChildren().forEach(child -> visitedChildren.add(visit(child, context))); + List<PlanNode> visitedChildren = + node.getChildren().stream() + .map(child -> visit(child, context)) + .collect(Collectors.toList()); TRegionReplicaSet dataRegion; - NodeDistributionType distributionType; + boolean isChildrenDistributionSame = nodeDistributionIsSame(visitedChildren, context); + NodeDistributionType distributionType = + isChildrenDistributionSame + ? NodeDistributionType.SAME_WITH_ALL_CHILDREN + : NodeDistributionType.SAME_WITH_SOME_CHILD; if (context.isAlignByDevice()) { // For align by device, // if dataRegions of children are the same, we set child's dataRegion to this node, // else we set the selected mostlyUsedDataRegion to this node - boolean inSame = nodeDistributionIsSame(visitedChildren, context); dataRegion = - inSame + isChildrenDistributionSame ? context.getNodeDistribution(visitedChildren.get(0).getPlanNodeId()).region : context.getMostlyUsedDataRegion(); context.putNodeDistribution( - newNode.getPlanNodeId(), - new NodeDistribution( - inSame - ? NodeDistributionType.SAME_WITH_ALL_CHILDREN - : NodeDistributionType.SAME_WITH_SOME_CHILD, - dataRegion)); + newNode.getPlanNodeId(), new NodeDistribution(distributionType, dataRegion)); } else { // TODO For align by time, we keep old logic for now dataRegion = calculateDataRegionByChildren(visitedChildren, context); - distributionType = - nodeDistributionIsSame(visitedChildren, context) - ? NodeDistributionType.SAME_WITH_ALL_CHILDREN - : NodeDistributionType.SAME_WITH_SOME_CHILD; context.putNodeDistribution( newNode.getPlanNodeId(), new NodeDistribution(distributionType, dataRegion)); + } - // If the distributionType of all the children are same, no ExchangeNode need to be added. - if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) { - newNode.setChildren(visitedChildren); - return newNode; - } + // If the distributionType of all the children are same, no ExchangeNode need to be added. + if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) { + newNode.setChildren(visitedChildren); + return newNode; } // Otherwise, we need to add ExchangeNode for the child whose DataRegion is different from the @@ -383,6 +373,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { private TRegionReplicaSet calculateDataRegionByChildren( List<PlanNode> children, NodeGroupContext context) { + // Step 1: calculate the count of children group by DataRegion. Map<TRegionReplicaSet, Long> groupByRegion = children.stream() @@ -399,16 +390,27 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { return region; }, Collectors.counting())); - if (groupByRegion.entrySet().size() == 1) { - return groupByRegion.entrySet().iterator().next().getKey(); + + if (groupByRegion.size() == 1) { + return groupByRegion.keySet().iterator().next(); } + // Step 2: return the RegionReplicaSet with max count - return Collections.max( - groupByRegion.entrySet().stream() - .filter(e -> e.getKey() != DataPartition.NOT_ASSIGNED) - .collect(Collectors.toList()), - Map.Entry.comparingByValue()) - .getKey(); + long maxRegionCount = -1; + TRegionReplicaSet result = null; + for (Map.Entry<TRegionReplicaSet, Long> entry : groupByRegion.entrySet()) { + if (DataPartition.NOT_ASSIGNED.equals(entry.getKey())) { + continue; + } + if (entry.getKey().equals(context.getMostlyUsedDataRegion())) { + return entry.getKey(); + } + if (entry.getValue() > maxRegionCount) { + maxRegionCount = entry.getValue(); + result = entry.getKey(); + } + } + return result; } private TRegionReplicaSet calculateSchemaRegionByChildren( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java index 14799292714..9ea36e7994c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java @@ -25,23 +25,31 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement; import java.util.Collections; import java.util.HashMap; import java.util.Map; public class NodeGroupContext { + protected final MPPQueryContext queryContext; private final Map<PlanNodeId, NodeDistribution> nodeDistributionMap; - private final boolean isAlignByDevice; - private final TRegionReplicaSet mostlyUsedDataRegion; + private boolean isAlignByDevice; + private TRegionReplicaSet mostlyUsedDataRegion; protected boolean hasExchangeNode; - public NodeGroupContext(MPPQueryContext queryContext, boolean isAlignByDevice, PlanNode root) { + public NodeGroupContext(MPPQueryContext queryContext, Statement statement, PlanNode root) { this.queryContext = queryContext; this.nodeDistributionMap = new HashMap<>(); - this.isAlignByDevice = isAlignByDevice; - this.mostlyUsedDataRegion = isAlignByDevice ? getMostlyUsedDataRegion(root) : null; + if (statement instanceof QueryStatement) { + this.isAlignByDevice = ((QueryStatement) statement).isAlignByDevice(); + this.mostlyUsedDataRegion = getMostlyUsedDataRegion(root); + } else if (statement instanceof ShowTimeSeriesStatement) { + this.mostlyUsedDataRegion = getMostlyUsedDataRegion(root); + } this.hasExchangeNode = false; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java index 91e818cb43c..ec481a98566 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java @@ -63,7 +63,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDe import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; -import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import java.util.ArrayList; import java.util.Collections; @@ -625,8 +624,8 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte public List<PlanNode> visitLastQuery(LastQueryNode node, DistributionPlanContext context) { // For last query, we need to keep every FI's root node is LastQueryMergeNode. So we // force every region group have a parent node even if there is only 1 child for it. - context.setForceAddParent(true); - PlanNode root = processRawMultiChildNode(node, context); + context.setForceAddParent(); + PlanNode root = processRawMultiChildNode(node, context, true); if (context.queryMultiRegion) { PlanNode newRoot = genLastQueryRootNode(node, context); // add sort op for each if we add LastQueryMergeNode as root @@ -698,11 +697,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte if (containsAggregationSource(node)) { return planAggregationWithTimeJoin(node, context); } - return Collections.singletonList(processRawMultiChildNode(node, context)); + return Collections.singletonList(processRawMultiChildNode(node, context, false)); } private PlanNode processRawMultiChildNode( - MultiChildProcessNode node, DistributionPlanContext context) { + MultiChildProcessNode node, DistributionPlanContext context, boolean isLast) { MultiChildProcessNode root = (MultiChildProcessNode) node.clone(); // Step 1: Get all source nodes. For the node which is not source, add it as the child of // current TimeJoinNode @@ -711,9 +710,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte if (child instanceof SeriesSourceNode) { // If the child is SeriesScanNode, we need to check whether this node should be seperated // into several splits. - SeriesSourceNode handle = (SeriesSourceNode) child; + SeriesSourceNode sourceNode = (SeriesSourceNode) child; List<TRegionReplicaSet> dataDistribution = - analysis.getPartitionInfo(handle.getPartitionPath(), handle.getPartitionTimeFilter()); + analysis.getPartitionInfo( + sourceNode.getPartitionPath(), sourceNode.getPartitionTimeFilter()); if (dataDistribution.size() > 1) { // We mark this variable to `true` if there is some series which is distributed in multi // DataRegions @@ -722,17 +722,17 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte // If the size of dataDistribution is m, this SeriesScanNode should be seperated into m // SeriesScanNode. for (TRegionReplicaSet dataRegion : dataDistribution) { - SeriesSourceNode split = (SeriesSourceNode) handle.clone(); + SeriesSourceNode split = (SeriesSourceNode) sourceNode.clone(); split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); split.setRegionReplicaSet(dataRegion); sources.add(split); } } } + // Step 2: For the source nodes, group them by the DataRegion. Map<TRegionReplicaSet, List<SourceNode>> sourceGroup = sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet)); - if (sourceGroup.size() > 1) { context.setQueryMultiRegion(true); } @@ -741,35 +741,31 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte // and make the // new TimeJoinNode as the child of current TimeJoinNode // TODO: (xingtanzjr) optimize the procedure here to remove duplicated TimeJoinNode - final boolean[] addParent = {false}; - sourceGroup.forEach( - (dataRegion, seriesScanNodes) -> { - if (seriesScanNodes.size() == 1 && !context.forceAddParent) { - root.addChild(seriesScanNodes.get(0)); - } else { - // If there is only one RegionGroup here, we should not create new MultiChildNode as the - // parent. - // If the size of RegionGroup is larger than 1, we need to consider the value of - // `forceAddParent`. - // If `forceAddParent` is true, we should not create new MultiChildNode as the parent, - // either. - // At last, we can use the parameter `addParent[0]` to judge whether to create new - // MultiChildNode. - boolean appendToRootDirectly = - sourceGroup.size() == 1 || (!addParent[0] && !context.forceAddParent); - if (appendToRootDirectly) { - seriesScanNodes.forEach(root::addChild); - addParent[0] = true; - } else { - // We clone a TimeJoinNode from root to make the params to be consistent. - // But we need to assign a new ID to it - MultiChildProcessNode parentOfGroup = (MultiChildProcessNode) root.clone(); - parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); - seriesScanNodes.forEach(parentOfGroup::addChild); - root.addChild(parentOfGroup); - } - } - }); + boolean addParent = false; + for (List<SourceNode> seriesScanNodes : sourceGroup.values()) { + if (seriesScanNodes.size() == 1 && (!context.forceAddParent || !isLast)) { + root.addChild(seriesScanNodes.get(0)); + continue; + } + // If size of RegionGroup = 1, we should not create new MultiChildNode as the parent. + // If size of RegionGroup > 1, we need to consider the value of `forceAddParent`. + // If `forceAddParent` is true, we should not create new MultiChildNode as the parent, either. + // At last, we can use the parameter `addParent` to judge whether to create new + // MultiChildNode. + boolean appendToRootDirectly = + sourceGroup.size() == 1 || (!addParent && !context.forceAddParent); + if (appendToRootDirectly) { + seriesScanNodes.forEach(root::addChild); + addParent = true; + } else { + // We clone a TimeJoinNode from root to make the params to be consistent. + // But we need to assign a new ID to it + MultiChildProcessNode parentOfGroup = (MultiChildProcessNode) root.clone(); + parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); + seriesScanNodes.forEach(parentOfGroup::addChild); + root.addChild(parentOfGroup); + } + } // Process the other children which are not SeriesSourceNode for (PlanNode child : node.getChildren()) { @@ -784,10 +780,6 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte return root; } - private boolean isAggregationQuery() { - return ((QueryStatement) analysis.getStatement()).isAggregationQuery(); - } - private boolean containsAggregationSource(TimeJoinNode node) { for (PlanNode child : node.getChildren()) { if (child instanceof SeriesAggregationScanNode diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java index fb2133f7dc2..108189259e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java @@ -108,6 +108,13 @@ public class LastQueryTransformNode extends SingleChildProcessNode { return Objects.hash(super.hashCode(), viewPath, dataType); } + @Override + public String toString() { + return String.format( + "LastQueryTransformNode-%s:[ViewPath: %s, DataType: %s]", + this.getPlanNodeId(), viewPath, dataType); + } + public String getViewPath() { return this.viewPath; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java index 7b7b5e8b500..b2197609e84 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java @@ -87,6 +87,11 @@ public class IdentitySinkNode extends MultiChildrenSinkNode { } } + @Override + public String toString() { + return String.format("IdentitySinkNode-%s", this.getPlanNodeId()); + } + public static IdentitySinkNode deserialize(ByteBuffer byteBuffer) { int size = ReadWriteIOUtils.readInt(byteBuffer); List<DownStreamChannelLocation> downStreamChannelLocationList = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java index 28cecd5f4e1..638c1c81e3f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java @@ -55,10 +55,10 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { // The default order is TIMESTAMP_ASC, which means "order by timestamp asc" private Ordering scanOrder = Ordering.ASC; - // time filter for current series, could be null if doesn't exist + // time filter for current series, could be null if it doesn't exist @Nullable private Filter timeFilter; - // value filter for current series, could be null if doesn't exist + // value filter for current series, could be null if it doesn't exist @Nullable private Filter valueFilter; // Limit for result set. The default value is -1, which means no limit
