This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/polish_node in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bf45a011333ca8eb602c404eaec234ac339ff3c8 Author: Jinrui.Zhang <[email protected]> AuthorDate: Mon Mar 28 17:44:29 2022 +0800 fix the bug in distribution planner --- .../sql/planner/plan/SimpleFragmentParallelPlanner.java | 3 ++- .../mpp/sql/planner/plan/node/process/ExchangeNode.java | 4 +++- .../mpp/sql/planner/plan/node/sink/FragmentSinkNode.java | 15 +++++++++++++-- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java index 3603310..1be8d95 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java @@ -72,7 +72,8 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { // one by one int instanceIdx = 0; PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot()); - FragmentInstance fragmentInstance = new FragmentInstance(fragment, instanceIdx); + FragmentInstance fragmentInstance = + new FragmentInstance(new PlanFragment(fragment.getId(), rootCopy), instanceIdx); // Get the target DataRegion for origin PlanFragment, then its instance will be distributed one // of them. diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java index 80bbf41..0df6d36 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java @@ -60,7 +60,9 @@ public class ExchangeNode extends PlanNode { public PlanNode clone() { ExchangeNode node = new ExchangeNode(getId()); if (remoteSourceNode != null) { - node.setRemoteSourceNode((FragmentSinkNode) remoteSourceNode.clone()); + FragmentSinkNode remoteSourceNodeClone = (FragmentSinkNode) remoteSourceNode.clone(); + remoteSourceNodeClone.setDownStreamNode(node); + node.setRemoteSourceNode(remoteSourceNode); } return node; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java index 3ab12c7..aeb129f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.service.rpc.thrift.EndPoint; import com.google.common.collect.ImmutableList; +import org.apache.commons.lang.Validate; import java.nio.ByteBuffer; import java.util.List; @@ -51,12 +52,22 @@ public class FragmentSinkNode extends SinkNode { @Override public PlanNode clone() { - return null; + FragmentSinkNode sinkNode = new FragmentSinkNode(getId()); + sinkNode.setDownStream(downStreamEndpoint, downStreamInstanceId, downStreamPlanNodeId); + sinkNode.setDownStreamNode(downStreamNode); + return sinkNode; } @Override public PlanNode cloneWithChildren(List<PlanNode> children) { - return null; + Validate.isTrue( + children == null || children.size() == 1, + "Children size of FragmentSinkNode should be 0 or 1"); + FragmentSinkNode sinkNode = (FragmentSinkNode) clone(); + if (children != null) { + sinkNode.setChild(children.get(0)); + } + return sinkNode; } @Override
