This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 53a2bee7b20 Fix the dead lock bug when the generated distributed query
plan contains cycle
53a2bee7b20 is described below
commit 53a2bee7b20d221c5cf01f7945cfa5708db94e87
Author: Beyyes <[email protected]>
AuthorDate: Tue Sep 26 18:03:53 2023 +0800
Fix the dead lock bug when the generated distributed query plan contains
cycle
---
.../distribution/DistributionPlanContext.java | 5 +-
.../planner/distribution/DistributionPlanner.java | 6 +-
.../planner/distribution/ExchangeNodeAdder.java | 72 ++++++++++----------
.../planner/distribution/NodeGroupContext.java | 18 +++--
.../plan/planner/distribution/SourceRewriter.java | 76 ++++++++++------------
.../node/process/last/LastQueryTransformNode.java | 7 ++
.../planner/plan/node/sink/IdentitySinkNode.java | 5 ++
.../plan/node/source/AlignedSeriesScanNode.java | 4 +-
8 files changed, 101 insertions(+), 92 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
index 87efdf508b7..ce6b25c26dc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
@@ -41,7 +41,6 @@ public class DistributionPlanContext {
protected DistributionPlanContext(MPPQueryContext queryContext) {
this.isRoot = true;
this.queryContext = queryContext;
- this.forceAddParent = false;
}
protected DistributionPlanContext copy() {
@@ -53,8 +52,8 @@ public class DistributionPlanContext {
return this;
}
- protected void setForceAddParent(boolean forceAddParent) {
- this.forceAddParent = forceAddParent;
+ protected void setForceAddParent() {
+ this.forceAddParent = true;
}
public void setOneSeriesInMultiRegion(boolean oneSeriesInMultiRegion) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
index 087ce5338eb..f0e016a5e52 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
@@ -81,11 +81,7 @@ public class DistributionPlanner {
public PlanNode addExchangeNode(PlanNode root) {
ExchangeNodeAdder adder = new ExchangeNodeAdder(this.analysis);
NodeGroupContext nodeGroupContext =
- new NodeGroupContext(
- context,
- analysis.getStatement() instanceof QueryStatement
- && (((QueryStatement)
analysis.getStatement()).isAlignByDevice()),
- root);
+ new NodeGroupContext(context, analysis.getStatement(), root);
PlanNode newRoot = adder.visit(root, nodeGroupContext);
adjustUpStream(newRoot, nodeGroupContext);
return newRoot;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 6f2cce9a83f..779f101a52b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -59,8 +59,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggre
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -107,11 +105,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
private PlanNode internalVisitSchemaMerge(
AbstractSchemaMergeNode node, NodeGroupContext context) {
- node.getChildren()
- .forEach(
- child -> {
- visit(child, context);
- });
+ node.getChildren().forEach(child -> visit(child, context));
NodeDistribution nodeDistribution =
new NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN);
PlanNode newNode = node.clone();
@@ -287,42 +281,38 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
}
MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
- List<PlanNode> visitedChildren = new ArrayList<>();
- node.getChildren().forEach(child -> visitedChildren.add(visit(child,
context)));
+ List<PlanNode> visitedChildren =
+ node.getChildren().stream()
+ .map(child -> visit(child, context))
+ .collect(Collectors.toList());
TRegionReplicaSet dataRegion;
- NodeDistributionType distributionType;
+ boolean isChildrenDistributionSame =
nodeDistributionIsSame(visitedChildren, context);
+ NodeDistributionType distributionType =
+ isChildrenDistributionSame
+ ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
+ : NodeDistributionType.SAME_WITH_SOME_CHILD;
if (context.isAlignByDevice()) {
// For align by device,
// if dataRegions of children are the same, we set child's dataRegion to
this node,
// else we set the selected mostlyUsedDataRegion to this node
- boolean inSame = nodeDistributionIsSame(visitedChildren, context);
dataRegion =
- inSame
+ isChildrenDistributionSame
?
context.getNodeDistribution(visitedChildren.get(0).getPlanNodeId()).region
: context.getMostlyUsedDataRegion();
context.putNodeDistribution(
- newNode.getPlanNodeId(),
- new NodeDistribution(
- inSame
- ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
- : NodeDistributionType.SAME_WITH_SOME_CHILD,
- dataRegion));
+ newNode.getPlanNodeId(), new NodeDistribution(distributionType,
dataRegion));
} else {
// TODO For align by time, we keep old logic for now
dataRegion = calculateDataRegionByChildren(visitedChildren, context);
- distributionType =
- nodeDistributionIsSame(visitedChildren, context)
- ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
- : NodeDistributionType.SAME_WITH_SOME_CHILD;
context.putNodeDistribution(
newNode.getPlanNodeId(), new NodeDistribution(distributionType,
dataRegion));
+ }
- // If the distributionType of all the children are same, no ExchangeNode
need to be added.
- if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
- newNode.setChildren(visitedChildren);
- return newNode;
- }
+ // If the distributionType of all the children are same, no ExchangeNode
need to be added.
+ if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
+ newNode.setChildren(visitedChildren);
+ return newNode;
}
// Otherwise, we need to add ExchangeNode for the child whose DataRegion
is different from the
@@ -383,6 +373,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
private TRegionReplicaSet calculateDataRegionByChildren(
List<PlanNode> children, NodeGroupContext context) {
+
// Step 1: calculate the count of children group by DataRegion.
Map<TRegionReplicaSet, Long> groupByRegion =
children.stream()
@@ -399,16 +390,27 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
return region;
},
Collectors.counting()));
- if (groupByRegion.entrySet().size() == 1) {
- return groupByRegion.entrySet().iterator().next().getKey();
+
+ if (groupByRegion.size() == 1) {
+ return groupByRegion.keySet().iterator().next();
}
+
// Step 2: return the RegionReplicaSet with max count
- return Collections.max(
- groupByRegion.entrySet().stream()
- .filter(e -> e.getKey() != DataPartition.NOT_ASSIGNED)
- .collect(Collectors.toList()),
- Map.Entry.comparingByValue())
- .getKey();
+ long maxRegionCount = -1;
+ TRegionReplicaSet result = null;
+ for (Map.Entry<TRegionReplicaSet, Long> entry : groupByRegion.entrySet()) {
+ if (DataPartition.NOT_ASSIGNED.equals(entry.getKey())) {
+ continue;
+ }
+ if (entry.getKey().equals(context.getMostlyUsedDataRegion())) {
+ return entry.getKey();
+ }
+ if (entry.getValue() > maxRegionCount) {
+ maxRegionCount = entry.getValue();
+ result = entry.getKey();
+ }
+ }
+ return result;
}
private TRegionReplicaSet calculateSchemaRegionByChildren(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
index 14799292714..9ea36e7994c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
@@ -25,23 +25,31 @@ import
org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class NodeGroupContext {
+
protected final MPPQueryContext queryContext;
private final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
- private final boolean isAlignByDevice;
- private final TRegionReplicaSet mostlyUsedDataRegion;
+ private boolean isAlignByDevice;
+ private TRegionReplicaSet mostlyUsedDataRegion;
protected boolean hasExchangeNode;
- public NodeGroupContext(MPPQueryContext queryContext, boolean
isAlignByDevice, PlanNode root) {
+ public NodeGroupContext(MPPQueryContext queryContext, Statement statement,
PlanNode root) {
this.queryContext = queryContext;
this.nodeDistributionMap = new HashMap<>();
- this.isAlignByDevice = isAlignByDevice;
- this.mostlyUsedDataRegion = isAlignByDevice ?
getMostlyUsedDataRegion(root) : null;
+ if (statement instanceof QueryStatement) {
+ this.isAlignByDevice = ((QueryStatement) statement).isAlignByDevice();
+ this.mostlyUsedDataRegion = getMostlyUsedDataRegion(root);
+ } else if (statement instanceof ShowTimeSeriesStatement) {
+ this.mostlyUsedDataRegion = getMostlyUsedDataRegion(root);
+ }
this.hasExchangeNode = false;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 0253dcd9231..14108d74e3c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -63,7 +63,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDe
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import java.util.ArrayList;
import java.util.Collections;
@@ -625,8 +624,8 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
public List<PlanNode> visitLastQuery(LastQueryNode node,
DistributionPlanContext context) {
// For last query, we need to keep every FI's root node is
LastQueryMergeNode. So we
// force every region group have a parent node even if there is only 1
child for it.
- context.setForceAddParent(true);
- PlanNode root = processRawMultiChildNode(node, context);
+ context.setForceAddParent();
+ PlanNode root = processRawMultiChildNode(node, context, true);
if (context.queryMultiRegion) {
PlanNode newRoot = genLastQueryRootNode(node, context);
// add sort op for each if we add LastQueryMergeNode as root
@@ -698,11 +697,11 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
if (containsAggregationSource(node)) {
return planAggregationWithTimeJoin(node, context);
}
- return Collections.singletonList(processRawMultiChildNode(node, context));
+ return Collections.singletonList(processRawMultiChildNode(node, context,
false));
}
private PlanNode processRawMultiChildNode(
- MultiChildProcessNode node, DistributionPlanContext context) {
+ MultiChildProcessNode node, DistributionPlanContext context, boolean
isLast) {
MultiChildProcessNode root = (MultiChildProcessNode) node.clone();
// Step 1: Get all source nodes. For the node which is not source, add it
as the child of
// current TimeJoinNode
@@ -711,9 +710,10 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
if (child instanceof SeriesSourceNode) {
// If the child is SeriesScanNode, we need to check whether this node
should be seperated
// into several splits.
- SeriesSourceNode handle = (SeriesSourceNode) child;
+ SeriesSourceNode sourceNode = (SeriesSourceNode) child;
List<TRegionReplicaSet> dataDistribution =
- analysis.getPartitionInfo(handle.getPartitionPath(),
handle.getPartitionTimeFilter());
+ analysis.getPartitionInfo(
+ sourceNode.getPartitionPath(),
sourceNode.getPartitionTimeFilter());
if (dataDistribution.size() > 1) {
// We mark this variable to `true` if there is some series which is
distributed in multi
// DataRegions
@@ -722,17 +722,17 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
// If the size of dataDistribution is m, this SeriesScanNode should be
seperated into m
// SeriesScanNode.
for (TRegionReplicaSet dataRegion : dataDistribution) {
- SeriesSourceNode split = (SeriesSourceNode) handle.clone();
+ SeriesSourceNode split = (SeriesSourceNode) sourceNode.clone();
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
split.setRegionReplicaSet(dataRegion);
sources.add(split);
}
}
}
+
// Step 2: For the source nodes, group them by the DataRegion.
Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
-
if (sourceGroup.size() > 1) {
context.setQueryMultiRegion(true);
}
@@ -741,35 +741,31 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
// and make the
// new TimeJoinNode as the child of current TimeJoinNode
// TODO: (xingtanzjr) optimize the procedure here to remove duplicated
TimeJoinNode
- final boolean[] addParent = {false};
- sourceGroup.forEach(
- (dataRegion, seriesScanNodes) -> {
- if (seriesScanNodes.size() == 1 && !context.forceAddParent) {
- root.addChild(seriesScanNodes.get(0));
- } else {
- // If there is only one RegionGroup here, we should not create new
MultiChildNode as the
- // parent.
- // If the size of RegionGroup is larger than 1, we need to
consider the value of
- // `forceAddParent`.
- // If `forceAddParent` is true, we should not create new
MultiChildNode as the parent,
- // either.
- // At last, we can use the parameter `addParent[0]` to judge
whether to create new
- // MultiChildNode.
- boolean appendToRootDirectly =
- sourceGroup.size() == 1 || (!addParent[0] &&
!context.forceAddParent);
- if (appendToRootDirectly) {
- seriesScanNodes.forEach(root::addChild);
- addParent[0] = true;
- } else {
- // We clone a TimeJoinNode from root to make the params to be
consistent.
- // But we need to assign a new ID to it
- MultiChildProcessNode parentOfGroup = (MultiChildProcessNode)
root.clone();
-
parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
- seriesScanNodes.forEach(parentOfGroup::addChild);
- root.addChild(parentOfGroup);
- }
- }
- });
+ boolean addParent = false;
+ for (List<SourceNode> seriesScanNodes : sourceGroup.values()) {
+ if (seriesScanNodes.size() == 1 && (!context.forceAddParent || !isLast))
{
+ root.addChild(seriesScanNodes.get(0));
+ continue;
+ }
+ // If size of RegionGroup = 1, we should not create new MultiChildNode
as the parent.
+ // If size of RegionGroup > 1, we need to consider the value of
`forceAddParent`.
+ // If `forceAddParent` is true, we should not create new MultiChildNode
as the parent, either.
+ // At last, we can use the parameter `addParent` to judge whether to
create new
+ // MultiChildNode.
+ boolean appendToRootDirectly =
+ sourceGroup.size() == 1 || (!addParent && !context.forceAddParent);
+ if (appendToRootDirectly) {
+ seriesScanNodes.forEach(root::addChild);
+ addParent = true;
+ } else {
+ // We clone a TimeJoinNode from root to make the params to be
consistent.
+ // But we need to assign a new ID to it
+ MultiChildProcessNode parentOfGroup = (MultiChildProcessNode)
root.clone();
+
parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ seriesScanNodes.forEach(parentOfGroup::addChild);
+ root.addChild(parentOfGroup);
+ }
+ }
// Process the other children which are not SeriesSourceNode
for (PlanNode child : node.getChildren()) {
@@ -784,10 +780,6 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
return root;
}
- private boolean isAggregationQuery() {
- return ((QueryStatement) analysis.getStatement()).isAggregationQuery();
- }
-
private boolean containsAggregationSource(TimeJoinNode node) {
for (PlanNode child : node.getChildren()) {
if (child instanceof SeriesAggregationScanNode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
index fb2133f7dc2..108189259e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
@@ -108,6 +108,13 @@ public class LastQueryTransformNode extends
SingleChildProcessNode {
return Objects.hash(super.hashCode(), viewPath, dataType);
}
+ @Override
+ public String toString() {
+ return String.format(
+ "LastQueryTransformNode-%s:[ViewPath: %s, DataType: %s]",
+ this.getPlanNodeId(), viewPath, dataType);
+ }
+
public String getViewPath() {
return this.viewPath;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
index 7b7b5e8b500..b2197609e84 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
@@ -87,6 +87,11 @@ public class IdentitySinkNode extends MultiChildrenSinkNode {
}
}
+ @Override
+ public String toString() {
+ return String.format("IdentitySinkNode-%s", this.getPlanNodeId());
+ }
+
public static IdentitySinkNode deserialize(ByteBuffer byteBuffer) {
int size = ReadWriteIOUtils.readInt(byteBuffer);
List<DownStreamChannelLocation> downStreamChannelLocationList = new
ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
index 28cecd5f4e1..638c1c81e3f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
@@ -55,10 +55,10 @@ public class AlignedSeriesScanNode extends SeriesSourceNode
{
// The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
private Ordering scanOrder = Ordering.ASC;
- // time filter for current series, could be null if doesn't exist
+ // time filter for current series, could be null if it doesn't exist
@Nullable private Filter timeFilter;
- // value filter for current series, could be null if doesn't exist
+ // value filter for current series, could be null if it doesn't exist
@Nullable private Filter valueFilter;
// Limit for result set. The default value is -1, which means no limit