This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/write_instance_parallel in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 850198002ecf1da7999bc8468632d1fc16eac62f Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed Apr 13 20:01:43 2022 +0800 tmp save --- .../apache/iotdb/db/mpp/common/PlanFragmentId.java | 4 +- .../db/mpp/sql/planner/DistributionPlanner.java | 19 +++++-- .../db/mpp/sql/planner/plan/FragmentInstance.java | 27 +++------- .../plan/SimpleFragmentParallelPlanner.java | 9 +--- .../planner/plan/WriteFragmentParallelPlanner.java | 61 ++++++++++++++++++++++ ...lePlanNodeRewriter.java => IWritePlanNode.java} | 24 +++------ .../planner/plan/node/SimplePlanNodeRewriter.java | 4 ++ .../plan/node/write/InsertMultiTabletNode.java | 3 +- .../plan/node/write/InsertMultiTabletsNode.java | 9 ++-- .../sql/planner/plan/node/write/InsertNode.java | 13 ++--- .../sql/planner/plan/node/write/InsertRowNode.java | 3 +- .../planner/plan/node/write/InsertRowsNode.java | 3 +- .../plan/node/write/InsertRowsOfOneDeviceNode.java | 3 +- .../planner/plan/node/write/InsertTabletNode.java | 5 +- .../db/mpp/sql/plan/DistributionPlannerTest.java | 39 ++++++++++++-- .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 22 ++++---- 16 files changed, 168 insertions(+), 80 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java index e4bdd7d306..4d8d098e56 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java @@ -89,9 +89,7 @@ public class PlanFragmentId { return false; } PlanFragmentId that = (PlanFragmentId) o; - return id == that.id - && nextFragmentInstanceId == that.nextFragmentInstanceId - && Objects.equals(queryId, that.queryId); + return id == that.id && Objects.equals(queryId, that.queryId); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java index 96341a2ca1..f87979a2a2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet; import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.sql.analyze.Analysis; +import org.apache.iotdb.db.mpp.sql.analyze.QueryType; import org.apache.iotdb.db.mpp.sql.planner.plan.*; import org.apache.iotdb.db.mpp.sql.planner.plan.node.*; import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode; @@ -71,7 +72,10 @@ public class DistributionPlanner { PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite); SubPlan subPlan = splitFragment(rootWithExchange); List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan); - SetSinkForRootInstance(subPlan, fragmentInstances); + // Only execute this step for READ operation + if (context.getQueryType() == QueryType.READ) { + SetSinkForRootInstance(subPlan, fragmentInstances); + } return new DistributedQueryPlan( logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances); } @@ -79,8 +83,9 @@ public class DistributionPlanner { // Convert fragment to detailed instance // And for parallel-able fragment, clone it into several instances with different params. public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) { - IFragmentParallelPlaner parallelPlaner = - new SimpleFragmentParallelPlanner(subPlan, analysis, context); + IFragmentParallelPlaner parallelPlaner = context.getQueryType() == QueryType.READ ? + new SimpleFragmentParallelPlanner(subPlan, analysis, context) : + new WriteFragmentParallelPlanner(subPlan, analysis, context); return parallelPlaner.parallelPlan(); } @@ -196,6 +201,10 @@ public class DistributionPlanner { private class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { @Override public PlanNode visitPlan(PlanNode node, NodeGroupContext context) { + // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently + if (node instanceof IWritePlanNode) { + return node; + } // Visit all the children of current node List<PlanNode> children = node.getChildren().stream() @@ -341,6 +350,10 @@ public class DistributionPlanner { } private void splitToSubPlan(PlanNode root, SubPlan subPlan) { + // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently + if (root instanceof IWritePlanNode) { + return; + } if (root instanceof ExchangeNode) { // We add a FragmentSinkNode for newly created PlanFragment ExchangeNode exchangeNode = (ExchangeNode) root; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java index 03fd9f3e0e..c263732b00 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.cluster.Endpoint; import org.apache.iotdb.commons.partition.RegionReplicaSet; import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; -import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.sql.analyze.QueryType; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil; @@ -49,33 +48,27 @@ public class FragmentInstance implements IConsensusRequest { // We can add some more params for a specific FragmentInstance // So that we can make different FragmentInstance owns different data range. - public FragmentInstance(PlanFragment fragment, int index, Filter timeFilter, QueryType type) { + public FragmentInstance(PlanFragment fragment, FragmentInstanceId id, Filter timeFilter, QueryType type) { this.fragment = fragment; this.timeFilter = timeFilter; - this.id = generateId(fragment.getId(), index); + this.id = id; this.type = type; } - public static FragmentInstanceId generateId(PlanFragmentId id, int index) { - return new FragmentInstanceId(id, String.valueOf(index)); - } - public RegionReplicaSet getDataRegionId() { return dataRegion; } - public void setDataRegionId(RegionReplicaSet dataRegion) { + public void setDataRegionAndHost(RegionReplicaSet dataRegion) { this.dataRegion = dataRegion; + // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current instance + this.hostEndpoint = dataRegion.getDataNodeList().get(0).getEndPoint(); } public Endpoint getHostEndpoint() { return hostEndpoint; } - public void setHostEndpoint(Endpoint hostEndpoint) { - this.hostEndpoint = hostEndpoint; - } - public PlanFragment getFragment() { return fragment; } @@ -127,8 +120,7 @@ public class FragmentInstance implements IConsensusRequest { Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null; QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)]; FragmentInstance fragmentInstance = - new FragmentInstance( - planFragment, Integer.parseInt(id.getInstanceId()), timeFilter, queryType); + new FragmentInstance(planFragment, id, timeFilter, queryType); RegionReplicaSet regionReplicaSet = new RegionReplicaSet(); try { regionReplicaSet.deserializeImpl(buffer); @@ -161,12 +153,7 @@ public class FragmentInstance implements IConsensusRequest { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; FragmentInstance instance = (FragmentInstance) o; - return Objects.equals(id, instance.id) - && type == instance.type - && Objects.equals(fragment, instance.fragment) - && Objects.equals(dataRegion, instance.dataRegion) - && Objects.equals(hostEndpoint, instance.hostEndpoint) - && Objects.equals(timeFilter, instance.timeFilter); + return Objects.equals(id, instance.id) && type == instance.type && Objects.equals(fragment, instance.fragment) && Objects.equals(dataRegion, instance.dataRegion) && Objects.equals(hostEndpoint, instance.hostEndpoint) && Objects.equals(timeFilter, instance.timeFilter); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java index c2014c55d2..280d9891af 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java @@ -79,7 +79,6 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { private void produceFragmentInstance(PlanFragment fragment) { // If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased // one by one - int instanceIdx = 0; PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot()); Filter timeFilter = analysis.getQueryFilter() == null @@ -88,7 +87,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { FragmentInstance fragmentInstance = new FragmentInstance( new PlanFragment(fragment.getId(), rootCopy), - instanceIdx, + fragment.getId().genFragmentInstanceId(), timeFilter, queryContext.getQueryType()); @@ -100,11 +99,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { // We need to store all the replica host in case of the scenario that the instance need to be // redirected // to another host when scheduling - fragmentInstance.setDataRegionId(dataRegion); - - // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current - // instance - fragmentInstance.setHostEndpoint(dataRegion.getDataNodeList().get(0).getEndPoint()); + fragmentInstance.setDataRegionAndHost(dataRegion); instanceMap.putIfAbsent(fragment.getId(), fragmentInstance); fragmentInstanceList.add(fragmentInstance); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java new file mode 100644 index 0000000000..eebf705cd3 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.sql.planner.plan; + +import org.apache.iotdb.db.mpp.common.MPPQueryContext; +import org.apache.iotdb.db.mpp.sql.analyze.Analysis; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; +import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; + +import java.util.ArrayList; +import java.util.List; + +public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner{ + + private SubPlan subPlan; + private Analysis analysis; + private MPPQueryContext queryContext; + + public WriteFragmentParallelPlanner(SubPlan subPlan, Analysis analysis, MPPQueryContext queryContext) { + this.subPlan = subPlan; + this.analysis = analysis; + this.queryContext = queryContext; + } + + @Override + public List<FragmentInstance> parallelPlan() { + PlanFragment fragment = subPlan.getPlanFragment(); + Filter timeFilter = analysis.getQueryFilter() != null ? ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter() : null; + PlanNode node = fragment.getRoot(); + if (!(node instanceof IWritePlanNode)) { + throw new IllegalArgumentException("PlanNode should be IWritePlanNode in WRITE operation"); + } + List<IWritePlanNode> splits = ((IWritePlanNode) node).splitByPartition(analysis); + List<FragmentInstance> ret = new ArrayList<>(); + for (IWritePlanNode split : splits) { + FragmentInstance instance = new FragmentInstance(new PlanFragment(fragment.getId(), split), fragment.getId().genFragmentInstanceId(), timeFilter, queryContext.getQueryType()); + instance.setDataRegionAndHost(((IWritePlanNode) node).getRegionReplicaSet()); + ret.add(instance); + } + return ret; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/IWritePlanNode.java similarity index 58% copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/IWritePlanNode.java index e0ca8f6cfd..7170952721 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/IWritePlanNode.java @@ -19,26 +19,18 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node; +import org.apache.iotdb.commons.partition.RegionReplicaSet; +import org.apache.iotdb.db.mpp.sql.analyze.Analysis; + import java.util.List; -import static com.google.common.collect.ImmutableList.toImmutableList; +public abstract class IWritePlanNode extends PlanNode { -public class SimplePlanNodeRewriter<C> extends PlanVisitor<PlanNode, C> { - @Override - public PlanNode visitPlan(PlanNode node, C context) { - return defaultRewrite(node, context); + protected IWritePlanNode(PlanNodeId id) { + super(id); } - public PlanNode defaultRewrite(PlanNode node, C context) { - List<PlanNode> children = - node.getChildren().stream() - .map(child -> rewrite(child, context)) - .collect(toImmutableList()); - - return node.cloneWithChildren(children); - } + public abstract RegionReplicaSet getRegionReplicaSet(); - public PlanNode rewrite(PlanNode node, C userContext) { - return node.accept(this, userContext); - } + public abstract List<IWritePlanNode> splitByPartition(Analysis analysis); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java index e0ca8f6cfd..b96002acb3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java @@ -26,6 +26,10 @@ import static com.google.common.collect.ImmutableList.toImmutableList; public class SimplePlanNodeRewriter<C> extends PlanVisitor<PlanNode, C> { @Override public PlanNode visitPlan(PlanNode node, C context) { + // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently + if (node instanceof IWritePlanNode) { + return node; + } return defaultRewrite(node, context); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java index ede58ff6b8..ef1c570457 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write; import org.apache.iotdb.db.mpp.sql.analyze.Analysis; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.tsfile.exception.NotImplementedException; @@ -33,7 +34,7 @@ public class InsertMultiTabletNode extends InsertNode { } @Override - public List<InsertNode> splitByPartition(Analysis analysis) { + public List<IWritePlanNode> splitByPartition(Analysis analysis) { return null; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java index 2a11bc6f9a..49b1065a05 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.partition.RegionReplicaSet; import org.apache.iotdb.db.mpp.sql.analyze.Analysis; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.tsfile.exception.NotImplementedException; @@ -107,13 +108,13 @@ public class InsertMultiTabletsNode extends InsertNode { } @Override - public List<InsertNode> splitByPartition(Analysis analysis) { + public List<IWritePlanNode> splitByPartition(Analysis analysis) { Map<RegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>(); for (int i = 0; i < insertTabletNodeList.size(); i++) { InsertTabletNode insertTabletNode = insertTabletNodeList.get(i); - List<InsertNode> tmpResult = insertTabletNode.splitByPartition(analysis); - for (InsertNode subNode : tmpResult) { - RegionReplicaSet dataRegionReplicaSet = subNode.getDataRegionReplicaSet(); + List<IWritePlanNode> tmpResult = insertTabletNode.splitByPartition(analysis); + for (IWritePlanNode subNode : tmpResult) { + RegionReplicaSet dataRegionReplicaSet = ((InsertNode)subNode).getDataRegionReplicaSet(); if (splitMap.containsKey(dataRegionReplicaSet)) { InsertMultiTabletsNode tmpNode = splitMap.get(dataRegionReplicaSet); tmpNode.addInsertTabletNode((InsertTabletNode) subNode, i); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java index 30b091d83b..d7eef6b50b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java @@ -21,17 +21,14 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write; import org.apache.iotdb.commons.partition.RegionReplicaSet; import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID; import org.apache.iotdb.db.metadata.path.PartialPath; -import org.apache.iotdb.db.mpp.sql.analyze.Analysis; -import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.tsfile.exception.NotImplementedException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - import java.nio.ByteBuffer; -import java.util.List; -public abstract class InsertNode extends PlanNode { +public abstract class InsertNode extends IWritePlanNode { /** * if use id table, this filed is id form of device path <br> @@ -130,9 +127,9 @@ public abstract class InsertNode extends PlanNode { this.deviceID = deviceID; } - // TODO(INSERT) split this insert node into multiple InsertNode according to the data partition - // info - public abstract List<InsertNode> splitByPartition(Analysis analysis); + public RegionReplicaSet getRegionReplicaSet() { + return dataRegionReplicaSet; + } @Override protected void serializeAttributes(ByteBuffer byteBuffer) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java index 6e937bef25..e84b461c87 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.partition.TimePartitionSlot; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.mpp.sql.analyze.Analysis; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.wal.buffer.IWALByteBufferView; @@ -57,7 +58,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { } @Override - public List<InsertNode> splitByPartition(Analysis analysis) { + public List<IWritePlanNode> splitByPartition(Analysis analysis) { TimePartitionSlot timePartitionSlot = StorageEngine.getTimePartitionSlot(time); this.dataRegionReplicaSet = analysis diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java index bcd4eedc07..3cba509506 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.partition.RegionReplicaSet; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.mpp.sql.analyze.Analysis; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.tsfile.exception.NotImplementedException; @@ -109,7 +110,7 @@ public class InsertRowsNode extends InsertNode { public void serialize(ByteBuffer byteBuffer) {} @Override - public List<InsertNode> splitByPartition(Analysis analysis) { + public List<IWritePlanNode> splitByPartition(Analysis analysis) { Map<RegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>(); for (int i = 0; i < insertRowNodeList.size(); i++) { InsertRowNode insertRowNode = insertRowNodeList.get(i); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java index da5b729318..8dc7fd7671 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.partition.RegionReplicaSet; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.mpp.sql.analyze.Analysis; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.tsfile.exception.NotImplementedException; @@ -105,7 +106,7 @@ public class InsertRowsOfOneDeviceNode extends InsertNode { public void serialize(ByteBuffer byteBuffer) {} @Override - public List<InsertNode> splitByPartition(Analysis analysis) { + public List<IWritePlanNode> splitByPartition(Analysis analysis) { Map<RegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>(); for (int i = 0; i < insertRowNodeList.size(); i++) { InsertRowNode insertRowNode = insertRowNodeList.get(i); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java index d27d0dafd0..a0a262fb5a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.partition.TimePartitionSlot; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.mpp.sql.analyze.Analysis; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.wal.buffer.IWALByteBufferView; @@ -149,9 +150,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { public void serializeToWAL(IWALByteBufferView buffer, int start, int end) {} @Override - public List<InsertNode> splitByPartition(Analysis analysis) { + public List<IWritePlanNode> splitByPartition(Analysis analysis) { // only single device in single storage group - List<InsertNode> result = new ArrayList<>(); + List<IWritePlanNode> result = new ArrayList<>(); if (times.length == 0) { return Collections.emptyList(); } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java index 03db5bb4ff..ed192c357b 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.partition.SeriesPartitionSlot; import org.apache.iotdb.commons.partition.TimePartitionSlot; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.metadata.path.MeasurementPath; +import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.sql.analyze.Analysis; @@ -40,10 +41,13 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import com.google.common.collect.Sets; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.junit.Test; import java.util.ArrayList; @@ -201,6 +205,35 @@ public class DistributionPlannerTest { assertEquals(3, plan.getInstances().size()); } + @Test + public void TestWriteParallelPlan() throws IllegalPathException { + QueryId queryId = new QueryId("test_write"); + InsertRowNode insertRowNode = new InsertRowNode( + queryId.genPlanNodeId(), + new PartialPath("root.sg.d1"), + false, + new MeasurementSchema[]{ + new MeasurementSchema("s1", TSDataType.INT32), + }, + new TSDataType[]{ + TSDataType.INT32 + }, + 1L, + new Object[]{ + 10 + }); + + Analysis analysis = constructAnalysis(); + + MPPQueryContext context = + new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint()); + DistributionPlanner planner = + new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode)); + DistributedQueryPlan plan = planner.planFragments(); + plan.getInstances().forEach(System.out::println); + assertEquals(1, plan.getInstances().size()); + } + private Analysis constructAnalysis() { Analysis analysis = new Analysis(); @@ -228,7 +261,7 @@ public class DistributionPlannerTest { new DataNodeLocation(21, new Endpoint("192.0.2.1", 9000)), new DataNodeLocation(22, new Endpoint("192.0.2.2", 9000))))); Map<TimePartitionSlot, List<RegionReplicaSet>> d1DataRegionMap = new HashMap<>(); - d1DataRegionMap.put(new TimePartitionSlot(), d1DataRegions); + d1DataRegionMap.put(new TimePartitionSlot(0), d1DataRegions); List<RegionReplicaSet> d2DataRegions = new ArrayList<>(); d2DataRegions.add( @@ -238,7 +271,7 @@ public class DistributionPlannerTest { new DataNodeLocation(31, new Endpoint("192.0.3.1", 9000)), new DataNodeLocation(32, new Endpoint("192.0.3.2", 9000))))); Map<TimePartitionSlot, List<RegionReplicaSet>> d2DataRegionMap = new HashMap<>(); - d2DataRegionMap.put(new TimePartitionSlot(), d2DataRegions); + d2DataRegionMap.put(new TimePartitionSlot(0), d2DataRegions); List<RegionReplicaSet> d3DataRegions = new ArrayList<>(); d3DataRegions.add( @@ -254,7 +287,7 @@ public class DistributionPlannerTest { new DataNodeLocation(41, new Endpoint("192.0.4.1", 9000)), new DataNodeLocation(42, new Endpoint("192.0.4.2", 9000))))); Map<TimePartitionSlot, List<RegionReplicaSet>> d3DataRegionMap = new HashMap<>(); - d3DataRegionMap.put(new TimePartitionSlot(), d3DataRegions); + d3DataRegionMap.put(new TimePartitionSlot(0), d3DataRegions); sgPartitionMap.put(new SeriesPartitionSlot(device1.length()), d1DataRegionMap); sgPartitionMap.put(new SeriesPartitionSlot(device2.length()), d2DataRegionMap); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java index dea1e9b031..f57f60dda6 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java @@ -18,6 +18,8 @@ */ package org.apache.iotdb.db.mpp.sql.plan; +import com.google.common.collect.ImmutableList; +import org.apache.iotdb.commons.cluster.DataNodeLocation; import org.apache.iotdb.commons.cluster.Endpoint; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.partition.RegionReplicaSet; @@ -54,16 +56,16 @@ public class FragmentInstanceSerdeTest { @Test public void TestSerializeAndDeserializeForTree1() throws IllegalPathException { + PlanFragmentId planFragmentId = new PlanFragmentId("test", -1); FragmentInstance fragmentInstance = new FragmentInstance( - new PlanFragment(new PlanFragmentId("test", -1), constructPlanNodeTree()), - -1, + new PlanFragment(planFragmentId, constructPlanNodeTree()), + planFragmentId.genFragmentInstanceId(), new GroupByFilter(1, 2, 3, 4), QueryType.READ); RegionReplicaSet regionReplicaSet = - new RegionReplicaSet(new DataRegionId(1), new ArrayList<>()); - fragmentInstance.setDataRegionId(regionReplicaSet); - fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666)); + new RegionReplicaSet(new DataRegionId(1), ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.1", 6666)))); + fragmentInstance.setDataRegionAndHost(regionReplicaSet); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); fragmentInstance.serializeRequest(byteBuffer); @@ -74,16 +76,16 @@ public class FragmentInstanceSerdeTest { @Test public void TestSerializeAndDeserializeWithNullFilter() throws IllegalPathException { + PlanFragmentId planFragmentId = new PlanFragmentId("test2", 1); FragmentInstance fragmentInstance = new FragmentInstance( - new PlanFragment(new PlanFragmentId("test2", 1), constructPlanNodeTree()), - -1, + new PlanFragment(planFragmentId, constructPlanNodeTree()), + planFragmentId.genFragmentInstanceId(), null, QueryType.READ); RegionReplicaSet regionReplicaSet = - new RegionReplicaSet(new DataRegionId(1), new ArrayList<>()); - fragmentInstance.setDataRegionId(regionReplicaSet); - fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.2", 6667)); + new RegionReplicaSet(new DataRegionId(1), ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.2", 6667)))); + fragmentInstance.setDataRegionAndHost(regionReplicaSet); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); fragmentInstance.serializeRequest(byteBuffer);
