This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/rel-1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 81f30668ef71278a20c760ed688542b9982a4009
Author: Beyyes <[email protected]>
AuthorDate: Tue Sep 26 18:03:53 2023 +0800

    Fix the dead lock bug when the generated distributed query plan contains 
cycle
---
 .../distribution/DistributionPlanContext.java      |  5 +-
 .../planner/distribution/DistributionPlanner.java  |  6 +-
 .../planner/distribution/ExchangeNodeAdder.java    | 72 ++++++++++----------
 .../planner/distribution/NodeGroupContext.java     | 18 +++--
 .../plan/planner/distribution/SourceRewriter.java  | 76 ++++++++++------------
 .../node/process/last/LastQueryTransformNode.java  |  7 ++
 .../planner/plan/node/sink/IdentitySinkNode.java   |  5 ++
 .../plan/node/source/AlignedSeriesScanNode.java    |  4 +-
 8 files changed, 101 insertions(+), 92 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
index 87efdf508b7..ce6b25c26dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
@@ -41,7 +41,6 @@ public class DistributionPlanContext {
   protected DistributionPlanContext(MPPQueryContext queryContext) {
     this.isRoot = true;
     this.queryContext = queryContext;
-    this.forceAddParent = false;
   }
 
   protected DistributionPlanContext copy() {
@@ -53,8 +52,8 @@ public class DistributionPlanContext {
     return this;
   }
 
-  protected void setForceAddParent(boolean forceAddParent) {
-    this.forceAddParent = forceAddParent;
+  protected void setForceAddParent() {
+    this.forceAddParent = true;
   }
 
   public void setOneSeriesInMultiRegion(boolean oneSeriesInMultiRegion) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
index 087ce5338eb..f0e016a5e52 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
@@ -81,11 +81,7 @@ public class DistributionPlanner {
   public PlanNode addExchangeNode(PlanNode root) {
     ExchangeNodeAdder adder = new ExchangeNodeAdder(this.analysis);
     NodeGroupContext nodeGroupContext =
-        new NodeGroupContext(
-            context,
-            analysis.getStatement() instanceof QueryStatement
-                && (((QueryStatement) 
analysis.getStatement()).isAlignByDevice()),
-            root);
+        new NodeGroupContext(context, analysis.getStatement(), root);
     PlanNode newRoot = adder.visit(root, nodeGroupContext);
     adjustUpStream(newRoot, nodeGroupContext);
     return newRoot;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 6f2cce9a83f..779f101a52b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -59,8 +59,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggre
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -107,11 +105,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
 
   private PlanNode internalVisitSchemaMerge(
       AbstractSchemaMergeNode node, NodeGroupContext context) {
-    node.getChildren()
-        .forEach(
-            child -> {
-              visit(child, context);
-            });
+    node.getChildren().forEach(child -> visit(child, context));
     NodeDistribution nodeDistribution =
         new NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN);
     PlanNode newNode = node.clone();
@@ -287,42 +281,38 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     }
 
     MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
-    List<PlanNode> visitedChildren = new ArrayList<>();
-    node.getChildren().forEach(child -> visitedChildren.add(visit(child, 
context)));
+    List<PlanNode> visitedChildren =
+        node.getChildren().stream()
+            .map(child -> visit(child, context))
+            .collect(Collectors.toList());
 
     TRegionReplicaSet dataRegion;
-    NodeDistributionType distributionType;
+    boolean isChildrenDistributionSame = 
nodeDistributionIsSame(visitedChildren, context);
+    NodeDistributionType distributionType =
+        isChildrenDistributionSame
+            ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
+            : NodeDistributionType.SAME_WITH_SOME_CHILD;
     if (context.isAlignByDevice()) {
       // For align by device,
       // if dataRegions of children are the same, we set child's dataRegion to 
this node,
       // else we set the selected mostlyUsedDataRegion to this node
-      boolean inSame = nodeDistributionIsSame(visitedChildren, context);
       dataRegion =
-          inSame
+          isChildrenDistributionSame
               ? 
context.getNodeDistribution(visitedChildren.get(0).getPlanNodeId()).region
               : context.getMostlyUsedDataRegion();
       context.putNodeDistribution(
-          newNode.getPlanNodeId(),
-          new NodeDistribution(
-              inSame
-                  ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
-                  : NodeDistributionType.SAME_WITH_SOME_CHILD,
-              dataRegion));
+          newNode.getPlanNodeId(), new NodeDistribution(distributionType, 
dataRegion));
     } else {
       // TODO For align by time, we keep old logic for now
       dataRegion = calculateDataRegionByChildren(visitedChildren, context);
-      distributionType =
-          nodeDistributionIsSame(visitedChildren, context)
-              ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
-              : NodeDistributionType.SAME_WITH_SOME_CHILD;
       context.putNodeDistribution(
           newNode.getPlanNodeId(), new NodeDistribution(distributionType, 
dataRegion));
+    }
 
-      // If the distributionType of all the children are same, no ExchangeNode 
need to be added.
-      if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
-        newNode.setChildren(visitedChildren);
-        return newNode;
-      }
+    // If the distributionType of all the children are same, no ExchangeNode 
need to be added.
+    if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
+      newNode.setChildren(visitedChildren);
+      return newNode;
     }
 
     // Otherwise, we need to add ExchangeNode for the child whose DataRegion 
is different from the
@@ -383,6 +373,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
 
   private TRegionReplicaSet calculateDataRegionByChildren(
       List<PlanNode> children, NodeGroupContext context) {
+
     // Step 1: calculate the count of children group by DataRegion.
     Map<TRegionReplicaSet, Long> groupByRegion =
         children.stream()
@@ -399,16 +390,27 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
                       return region;
                     },
                     Collectors.counting()));
-    if (groupByRegion.entrySet().size() == 1) {
-      return groupByRegion.entrySet().iterator().next().getKey();
+
+    if (groupByRegion.size() == 1) {
+      return groupByRegion.keySet().iterator().next();
     }
+
     // Step 2: return the RegionReplicaSet with max count
-    return Collections.max(
-            groupByRegion.entrySet().stream()
-                .filter(e -> e.getKey() != DataPartition.NOT_ASSIGNED)
-                .collect(Collectors.toList()),
-            Map.Entry.comparingByValue())
-        .getKey();
+    long maxRegionCount = -1;
+    TRegionReplicaSet result = null;
+    for (Map.Entry<TRegionReplicaSet, Long> entry : groupByRegion.entrySet()) {
+      if (DataPartition.NOT_ASSIGNED.equals(entry.getKey())) {
+        continue;
+      }
+      if (entry.getKey().equals(context.getMostlyUsedDataRegion())) {
+        return entry.getKey();
+      }
+      if (entry.getValue() > maxRegionCount) {
+        maxRegionCount = entry.getValue();
+        result = entry.getKey();
+      }
+    }
+    return result;
   }
 
   private TRegionReplicaSet calculateSchemaRegionByChildren(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
index 14799292714..9ea36e7994c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
@@ -25,23 +25,31 @@ import 
org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 public class NodeGroupContext {
+
   protected final MPPQueryContext queryContext;
   private final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
-  private final boolean isAlignByDevice;
-  private final TRegionReplicaSet mostlyUsedDataRegion;
+  private boolean isAlignByDevice;
+  private TRegionReplicaSet mostlyUsedDataRegion;
   protected boolean hasExchangeNode;
 
-  public NodeGroupContext(MPPQueryContext queryContext, boolean 
isAlignByDevice, PlanNode root) {
+  public NodeGroupContext(MPPQueryContext queryContext, Statement statement, 
PlanNode root) {
     this.queryContext = queryContext;
     this.nodeDistributionMap = new HashMap<>();
-    this.isAlignByDevice = isAlignByDevice;
-    this.mostlyUsedDataRegion = isAlignByDevice ? 
getMostlyUsedDataRegion(root) : null;
+    if (statement instanceof QueryStatement) {
+      this.isAlignByDevice = ((QueryStatement) statement).isAlignByDevice();
+      this.mostlyUsedDataRegion = getMostlyUsedDataRegion(root);
+    } else if (statement instanceof ShowTimeSeriesStatement) {
+      this.mostlyUsedDataRegion = getMostlyUsedDataRegion(root);
+    }
     this.hasExchangeNode = false;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 91e818cb43c..ec481a98566 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -63,7 +63,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDe
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -625,8 +624,8 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
   public List<PlanNode> visitLastQuery(LastQueryNode node, 
DistributionPlanContext context) {
     // For last query, we need to keep every FI's root node is 
LastQueryMergeNode. So we
     // force every region group have a parent node even if there is only 1 
child for it.
-    context.setForceAddParent(true);
-    PlanNode root = processRawMultiChildNode(node, context);
+    context.setForceAddParent();
+    PlanNode root = processRawMultiChildNode(node, context, true);
     if (context.queryMultiRegion) {
       PlanNode newRoot = genLastQueryRootNode(node, context);
       // add sort op for each if we add LastQueryMergeNode as root
@@ -698,11 +697,11 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     if (containsAggregationSource(node)) {
       return planAggregationWithTimeJoin(node, context);
     }
-    return Collections.singletonList(processRawMultiChildNode(node, context));
+    return Collections.singletonList(processRawMultiChildNode(node, context, 
false));
   }
 
   private PlanNode processRawMultiChildNode(
-      MultiChildProcessNode node, DistributionPlanContext context) {
+      MultiChildProcessNode node, DistributionPlanContext context, boolean 
isLast) {
     MultiChildProcessNode root = (MultiChildProcessNode) node.clone();
     // Step 1: Get all source nodes. For the node which is not source, add it 
as the child of
     // current TimeJoinNode
@@ -711,9 +710,10 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
       if (child instanceof SeriesSourceNode) {
         // If the child is SeriesScanNode, we need to check whether this node 
should be seperated
         // into several splits.
-        SeriesSourceNode handle = (SeriesSourceNode) child;
+        SeriesSourceNode sourceNode = (SeriesSourceNode) child;
         List<TRegionReplicaSet> dataDistribution =
-            analysis.getPartitionInfo(handle.getPartitionPath(), 
handle.getPartitionTimeFilter());
+            analysis.getPartitionInfo(
+                sourceNode.getPartitionPath(), 
sourceNode.getPartitionTimeFilter());
         if (dataDistribution.size() > 1) {
           // We mark this variable to `true` if there is some series which is 
distributed in multi
           // DataRegions
@@ -722,17 +722,17 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
         // If the size of dataDistribution is m, this SeriesScanNode should be 
seperated into m
         // SeriesScanNode.
         for (TRegionReplicaSet dataRegion : dataDistribution) {
-          SeriesSourceNode split = (SeriesSourceNode) handle.clone();
+          SeriesSourceNode split = (SeriesSourceNode) sourceNode.clone();
           
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
           split.setRegionReplicaSet(dataRegion);
           sources.add(split);
         }
       }
     }
+
     // Step 2: For the source nodes, group them by the DataRegion.
     Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
         
sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
-
     if (sourceGroup.size() > 1) {
       context.setQueryMultiRegion(true);
     }
@@ -741,35 +741,31 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     // and make the
     // new TimeJoinNode as the child of current TimeJoinNode
     // TODO: (xingtanzjr) optimize the procedure here to remove duplicated 
TimeJoinNode
-    final boolean[] addParent = {false};
-    sourceGroup.forEach(
-        (dataRegion, seriesScanNodes) -> {
-          if (seriesScanNodes.size() == 1 && !context.forceAddParent) {
-            root.addChild(seriesScanNodes.get(0));
-          } else {
-            // If there is only one RegionGroup here, we should not create new 
MultiChildNode as the
-            // parent.
-            // If the size of RegionGroup is larger than 1, we need to 
consider the value of
-            // `forceAddParent`.
-            // If `forceAddParent` is true, we should not create new 
MultiChildNode as the parent,
-            // either.
-            // At last, we can use the parameter `addParent[0]` to judge 
whether to create new
-            // MultiChildNode.
-            boolean appendToRootDirectly =
-                sourceGroup.size() == 1 || (!addParent[0] && 
!context.forceAddParent);
-            if (appendToRootDirectly) {
-              seriesScanNodes.forEach(root::addChild);
-              addParent[0] = true;
-            } else {
-              // We clone a TimeJoinNode from root to make the params to be 
consistent.
-              // But we need to assign a new ID to it
-              MultiChildProcessNode parentOfGroup = (MultiChildProcessNode) 
root.clone();
-              
parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-              seriesScanNodes.forEach(parentOfGroup::addChild);
-              root.addChild(parentOfGroup);
-            }
-          }
-        });
+    boolean addParent = false;
+    for (List<SourceNode> seriesScanNodes : sourceGroup.values()) {
+      if (seriesScanNodes.size() == 1 && (!context.forceAddParent || !isLast)) 
{
+        root.addChild(seriesScanNodes.get(0));
+        continue;
+      }
+      // If size of RegionGroup = 1, we should not create new MultiChildNode 
as the parent.
+      // If size of RegionGroup > 1, we need to consider the value of 
`forceAddParent`.
+      // If `forceAddParent` is true, we should not create new MultiChildNode 
as the parent, either.
+      // At last, we can use the parameter `addParent` to judge whether to 
create new
+      // MultiChildNode.
+      boolean appendToRootDirectly =
+          sourceGroup.size() == 1 || (!addParent && !context.forceAddParent);
+      if (appendToRootDirectly) {
+        seriesScanNodes.forEach(root::addChild);
+        addParent = true;
+      } else {
+        // We clone a TimeJoinNode from root to make the params to be 
consistent.
+        // But we need to assign a new ID to it
+        MultiChildProcessNode parentOfGroup = (MultiChildProcessNode) 
root.clone();
+        
parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+        seriesScanNodes.forEach(parentOfGroup::addChild);
+        root.addChild(parentOfGroup);
+      }
+    }
 
     // Process the other children which are not SeriesSourceNode
     for (PlanNode child : node.getChildren()) {
@@ -784,10 +780,6 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     return root;
   }
 
-  private boolean isAggregationQuery() {
-    return ((QueryStatement) analysis.getStatement()).isAggregationQuery();
-  }
-
   private boolean containsAggregationSource(TimeJoinNode node) {
     for (PlanNode child : node.getChildren()) {
       if (child instanceof SeriesAggregationScanNode
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
index fb2133f7dc2..108189259e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
@@ -108,6 +108,13 @@ public class LastQueryTransformNode extends 
SingleChildProcessNode {
     return Objects.hash(super.hashCode(), viewPath, dataType);
   }
 
+  @Override
+  public String toString() {
+    return String.format(
+        "LastQueryTransformNode-%s:[ViewPath: %s, DataType: %s]",
+        this.getPlanNodeId(), viewPath, dataType);
+  }
+
   public String getViewPath() {
     return this.viewPath;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
index 7b7b5e8b500..b2197609e84 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
@@ -87,6 +87,11 @@ public class IdentitySinkNode extends MultiChildrenSinkNode {
     }
   }
 
+  @Override
+  public String toString() {
+    return String.format("IdentitySinkNode-%s", this.getPlanNodeId());
+  }
+
   public static IdentitySinkNode deserialize(ByteBuffer byteBuffer) {
     int size = ReadWriteIOUtils.readInt(byteBuffer);
     List<DownStreamChannelLocation> downStreamChannelLocationList = new 
ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
index 28cecd5f4e1..638c1c81e3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
@@ -55,10 +55,10 @@ public class AlignedSeriesScanNode extends SeriesSourceNode 
{
   // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
   private Ordering scanOrder = Ordering.ASC;
 
-  // time filter for current series, could be null if doesn't exist
+  // time filter for current series, could be null if it doesn't exist
   @Nullable private Filter timeFilter;
 
-  // value filter for current series, could be null if doesn't exist
+  // value filter for current series, could be null if it doesn't exist
   @Nullable private Filter valueFilter;
 
   // Limit for result set. The default value is -1, which means no limit

Reply via email to