This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/polish_node
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bf45a011333ca8eb602c404eaec234ac339ff3c8
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Mar 28 17:44:29 2022 +0800

    fix the bug in distribution planner
---
 .../sql/planner/plan/SimpleFragmentParallelPlanner.java   |  3 ++-
 .../mpp/sql/planner/plan/node/process/ExchangeNode.java   |  4 +++-
 .../mpp/sql/planner/plan/node/sink/FragmentSinkNode.java  | 15 +++++++++++++--
 3 files changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 3603310..1be8d95 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -72,7 +72,8 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
     // one by one
     int instanceIdx = 0;
     PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
-    FragmentInstance fragmentInstance = new FragmentInstance(fragment, 
instanceIdx);
+    FragmentInstance fragmentInstance =
+        new FragmentInstance(new PlanFragment(fragment.getId(), rootCopy), 
instanceIdx);
 
     // Get the target DataRegion for origin PlanFragment, then its instance 
will be distributed one
     // of them.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 80bbf41..0df6d36 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -60,7 +60,9 @@ public class ExchangeNode extends PlanNode {
   public PlanNode clone() {
     ExchangeNode node = new ExchangeNode(getId());
     if (remoteSourceNode != null) {
-      node.setRemoteSourceNode((FragmentSinkNode) remoteSourceNode.clone());
+      FragmentSinkNode remoteSourceNodeClone = (FragmentSinkNode) 
remoteSourceNode.clone();
+      remoteSourceNodeClone.setDownStreamNode(node);
+      node.setRemoteSourceNode(remoteSourceNode);
     }
     return node;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 3ab12c7..aeb129f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang.Validate;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -51,12 +52,22 @@ public class FragmentSinkNode extends SinkNode {
 
   @Override
   public PlanNode clone() {
-    return null;
+    FragmentSinkNode sinkNode = new FragmentSinkNode(getId());
+    sinkNode.setDownStream(downStreamEndpoint, downStreamInstanceId, 
downStreamPlanNodeId);
+    sinkNode.setDownStreamNode(downStreamNode);
+    return sinkNode;
   }
 
   @Override
   public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+    Validate.isTrue(
+        children == null || children.size() == 1,
+        "Children size of FragmentSinkNode should be 0 or 1");
+    FragmentSinkNode sinkNode = (FragmentSinkNode) clone();
+    if (children != null) {
+      sinkNode.setChild(children.get(0));
+    }
+    return sinkNode;
   }
 
   @Override

Reply via email to