This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch support_table_model_redirect in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e025251a83ad22d2c287b72497776bc876e993d6 Author: HTHou <[email protected]> AuthorDate: Thu Sep 19 14:34:40 2024 +0800 dev_server_side --- .../db/queryengine/plan/analyze/Analysis.java | 1 + .../db/queryengine/plan/analyze/IAnalysis.java | 2 + .../planner/plan/node/write/InsertTabletNode.java | 10 ++--- .../node/write/RelationalInsertTabletNode.java | 43 ++++++++++++++++++++++ .../plan/relational/analyzer/Analysis.java | 5 +++ .../plan/relational/planner/TableModelPlanner.java | 39 +++++++++++++++++++- 6 files changed, 94 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java index 11e8e42ec83..1f8905cc666 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java @@ -414,6 +414,7 @@ public class Analysis implements IAnalysis { this.schemaTree = schemaTree; } + @Override public List<TEndPoint> getRedirectNodeList() { return redirectNodeList; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java index 05c3638a95d..e03f75db230 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java @@ -66,6 +66,8 @@ public interface IAnalysis { DataPartition getDataPartitionInfo(); + List<TEndPoint> getRedirectNodeList(); + void setRedirectNodeList(List<TEndPoint> redirectNodeList); void addEndPointToRedirectNodeList(TEndPoint endPoint); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index d40d1c50b84..3a2027d1927 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -248,7 +248,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { return deviceIDSplitInfoMap; } - private Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet( + protected Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet( Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap, IAnalysis analysis) { Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>(); @@ -1189,12 +1189,12 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { return deviceID; } - private static class PartitionSplitInfo { + protected static class PartitionSplitInfo { // for each List in split, they are range1.start, range1.end, range2.start, range2.end, ... - private List<Integer> ranges = new ArrayList<>(); - private List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>(); - private List<TRegionReplicaSet> replicaSets; + List<Integer> ranges = new ArrayList<>(); + List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>(); + List<TRegionReplicaSet> replicaSets; } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index c2129f8d8d7..b6b97457cff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -19,10 +19,13 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.exception.query.OutOfTTLException; +import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; @@ -41,7 +44,9 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.IntToLongFunction; public class RelationalInsertTabletNode extends InsertTabletNode { @@ -139,6 +144,44 @@ public class RelationalInsertTabletNode extends InsertTabletNode { columnCategories); } + protected Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet( + Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap, IAnalysis analysis) { + Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>(); + Map<IDeviceID, TEndPoint> endPointMap = new HashMap<>(); + + for (Map.Entry<IDeviceID, PartitionSplitInfo> entry : deviceIDSplitInfoMap.entrySet()) { + final IDeviceID deviceID = entry.getKey(); + final PartitionSplitInfo splitInfo = entry.getValue(); + final List<TRegionReplicaSet> replicaSets = + analysis + .getDataPartitionInfo() + .getDataRegionReplicaSetForWriting( + deviceID, splitInfo.timePartitionSlots, analysis.getDatabaseName()); + splitInfo.replicaSets = replicaSets; + // collect redirectInfo + endPointMap.put( + deviceID, + replicaSets + .get(replicaSets.size() - 1) + .getDataNodeLocations() + .get(0) + .getClientRpcEndPoint()); + for (int i = 0; i < replicaSets.size(); i++) { + List<Integer> subRanges = + splitMap.computeIfAbsent(replicaSets.get(i), x -> new ArrayList<>()); + subRanges.add(splitInfo.ranges.get(2 * i)); + subRanges.add(splitInfo.ranges.get(2 * i + 1)); + } + } + List<TEndPoint> redirectNodeList = new ArrayList<>(times.length); + for (int i = 0; i < times.length; i++) { + IDeviceID deviceId = getDeviceID(i); + redirectNodeList.add(endPointMap.get(deviceId)); + } + analysis.setRedirectNodeList(redirectNodeList); + return splitMap; + } + public static RelationalInsertTabletNode deserialize(ByteBuffer byteBuffer) { RelationalInsertTabletNode insertNode = new RelationalInsertTabletNode(new PlanNodeId("")); insertNode.subDeserialize(byteBuffer); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java index bb1678f6598..774f7576a48 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java @@ -750,6 +750,11 @@ public class Analysis implements IAnalysis { } } + @Override + public List<TEndPoint> getRedirectNodeList() { + return redirectNodeList; + } + @Override public void setRedirectNodeList(List<TEndPoint> redirectNodeList) { this.redirectNodeList = redirectNodeList; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java index e750e2d95a8..762939c7c74 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java @@ -38,12 +38,18 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributedPlanner; import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement; import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; import org.apache.iotdb.db.queryengine.plan.scheduler.ClusterScheduler; import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -149,7 +155,38 @@ public class TableModelPlanner implements IPlanner { @Override public void setRedirectInfo( - IAnalysis analysis, TEndPoint localEndPoint, TSStatus tsstatus, TSStatusCode statusCode) {} + IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus, TSStatusCode statusCode) { + Analysis analysis = (Analysis) iAnalysis; + + assert analysis.getStatement() instanceof WrappedInsertStatement; + InsertBaseStatement insertStatement = + ((WrappedInsertStatement) analysis.getStatement()).getInnerTreeStatement(); + + if (!analysis.isFinishQueryAfterAnalyze()) { + // Table Model Session only supports insertTablet + if (insertStatement instanceof InsertTabletStatement) { + if (statusCode == TSStatusCode.SUCCESS_STATUS) { + boolean needRedirect = false; + List<TEndPoint> redirectNodeList = analysis.getRedirectNodeList(); + List<TSStatus> subStatus = new ArrayList<>(redirectNodeList.size()); + for (TEndPoint endPoint : redirectNodeList) { + // redirect writing only if the redirectEndPoint is not the current node + if (!localEndPoint.equals(endPoint)) { + subStatus.add( + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS).setRedirectNode(endPoint)); + needRedirect = true; + } else { + subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + } + } + if (needRedirect) { + tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); + tsstatus.setSubStatus(subStatus); + } + } + } + } + } private static class NopAccessControl implements AccessControl {} }
