This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 16bb63685b [IOTDB-4568] Added test api for partitions (#7497)
16bb63685b is described below
commit 16bb63685bc5295dd5199235e0c4e845c481c2cb
Author: Caideyipi <[email protected]>
AuthorDate: Mon Oct 10 13:50:13 2022 +0800
[IOTDB-4568] Added test api for partitions (#7497)
---
.../consensus/request/ConfigPhysicalPlan.java | 12 ++
.../consensus/request/ConfigPhysicalPlanType.java | 6 +-
.../request/read/GetNodePathsPartitionPlan.java | 1 +
.../consensus/request/read/GetRoutingPlan.java | 99 ++++++++++++++
...rtitionPlan.java => GetSeriesSlotListPlan.java} | 58 ++++-----
.../request/read/GetTimeSlotListPlan.java | 109 ++++++++++++++++
...chemaPartitionResp.java => GetRoutingResp.java} | 40 ++----
...rtitionResp.java => GetSeriesSlotListResp.java} | 40 ++----
.../consensus/response/GetTimeSlotListResp.java | 59 +++++++++
.../consensus/response/SchemaPartitionResp.java | 2 +-
.../iotdb/confignode/manager/ConfigManager.java | 55 ++++++--
.../apache/iotdb/confignode/manager/IManager.java | 19 ++-
.../manager/partition/PartitionManager.java | 29 ++++-
.../persistence/executor/ConfigPlanExecutor.java | 9 ++
.../persistence/partition/PartitionInfo.java | 45 ++++++-
.../partition/StorageGroupPartitionTable.java | 43 ++++--
.../thrift/ConfigNodeRPCServiceProcessor.java | 121 ++++++++++-------
.../request/ConfigPhysicalPlanSerDeTest.java | 47 ++++++-
.../iotdb/confignode/IoTDBClusterPartitionIT.java | 145 +++++++++++++++++++++
.../commons/partition/DataPartitionTable.java | 36 +++--
.../commons/partition/SchemaPartitionTable.java | 5 +
.../commons/partition/SeriesPartitionTable.java | 21 +++
.../org/apache/iotdb/commons/path/PartialPath.java | 1 -
.../apache/iotdb/db/client/ConfigNodeClient.java | 58 +++++++++
.../dataregion/StorageGroupManager.java | 4 +-
.../db/mpp/plan/analyze/cache/PartitionCache.java | 9 +-
.../src/main/thrift/confignode.thrift | 48 +++++++
27 files changed, 929 insertions(+), 192 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 745a70dde2..46b72279bf 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -27,8 +27,11 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetNodePathsPartitionP
import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetRoutingPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
+import
org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
import
org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
import
org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
@@ -271,6 +274,15 @@ public abstract class ConfigPhysicalPlan implements
IConsensusRequest {
case GetPipeSink:
req = new GetPipeSinkPlan();
break;
+ case GetRouting:
+ req = new GetRoutingPlan();
+ break;
+ case GetTimeSlotList:
+ req = new GetTimeSlotListPlan();
+ break;
+ case GetSeriesSlotList:
+ req = new GetSeriesSlotListPlan();
+ break;
default:
throw new IOException("unknown PhysicalPlan type: " + typeNum);
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 30a00fff43..6f467b5c01 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -67,7 +67,6 @@ public enum ConfigPhysicalPlanType {
CreateFunction,
DropFunction,
GetRegionInfoList,
- GetDataNodesInfoList,
GetNodePathsPartition,
CreateSchemaTemplate,
GetAllSchemaTemplate,
@@ -85,5 +84,8 @@ public enum ConfigPhysicalPlanType {
AddTriggerInTable,
DeleteTriggerInTable,
GetTriggerTable,
- UpdateTriggerStateInTable
+ UpdateTriggerStateInTable,
+ GetRouting,
+ GetSeriesSlotList,
+ GetTimeSlotList
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetNodePathsPartitionPlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetNodePathsPartitionPlan.java
index ae8fc9b446..921144d5c7 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetNodePathsPartitionPlan.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetNodePathsPartitionPlan.java
@@ -55,6 +55,7 @@ public class GetNodePathsPartitionPlan extends
ConfigPhysicalPlan {
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeInt(getType().ordinal());
partialPath.serialize(stream);
stream.writeInt(level);
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetRoutingPlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetRoutingPlan.java
new file mode 100644
index 0000000000..fdce6889c0
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetRoutingPlan.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.read;
+
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class GetRoutingPlan extends ConfigPhysicalPlan {
+
+ private String storageGroup;
+
+ private TSeriesPartitionSlot seriesSlotId;
+
+ private TTimePartitionSlot timeSlotId;
+
+ public GetRoutingPlan() {
+ super(ConfigPhysicalPlanType.GetRouting);
+ }
+
+ public GetRoutingPlan(
+ String storageGroup, TSeriesPartitionSlot seriesSlotId,
TTimePartitionSlot timeSlotId) {
+ this();
+ this.storageGroup = storageGroup;
+ this.seriesSlotId = seriesSlotId;
+ this.timeSlotId = timeSlotId;
+ }
+
+ public String getStorageGroup() {
+ return storageGroup;
+ }
+
+ public TSeriesPartitionSlot getSeriesSlotId() {
+ return seriesSlotId;
+ }
+
+ public TTimePartitionSlot getTimeSlotId() {
+ return timeSlotId;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeInt(getType().ordinal());
+ ReadWriteIOUtils.write(storageGroup, stream);
+ ThriftCommonsSerDeUtils.serializeTSeriesPartitionSlot(seriesSlotId,
stream);
+ ThriftCommonsSerDeUtils.serializeTTimePartitionSlot(timeSlotId, stream);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ this.storageGroup = ReadWriteIOUtils.readString(buffer);
+ this.seriesSlotId =
ThriftCommonsSerDeUtils.deserializeTSeriesPartitionSlot(buffer);
+ this.timeSlotId =
ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ GetRoutingPlan that = (GetRoutingPlan) o;
+ return storageGroup.equals(that.storageGroup)
+ && seriesSlotId.equals(that.seriesSlotId)
+ && timeSlotId.equals(that.timeSlotId);
+ }
+
+ @Override
+ public int hashCode() {
+ int hashcode = 1;
+ hashcode = hashcode * 31 + Objects.hash(storageGroup);
+ hashcode = hashcode * 31 + seriesSlotId.hashCode();
+ hashcode = hashcode * 31 + timeSlotId.hashCode();
+ return hashcode;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetNodePathsPartitionPlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetSeriesSlotListPlan.java
similarity index 51%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetNodePathsPartitionPlan.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetSeriesSlotListPlan.java
index ae8fc9b446..9a27640589 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetNodePathsPartitionPlan.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetSeriesSlotListPlan.java
@@ -19,66 +19,66 @@
package org.apache.iotdb.confignode.consensus.request.read;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
-public class GetNodePathsPartitionPlan extends ConfigPhysicalPlan {
- private PartialPath partialPath;
- private int level = -1;
+public class GetSeriesSlotListPlan extends ConfigPhysicalPlan {
- public GetNodePathsPartitionPlan() {
- super(ConfigPhysicalPlanType.GetNodePathsPartition);
- }
+ private String storageGroup;
+
+ private TConsensusGroupType partitionType;
- public PartialPath getPartialPath() {
- return partialPath;
+ public GetSeriesSlotListPlan() {
+ super(ConfigPhysicalPlanType.GetSeriesSlotList);
}
- public void setPartialPath(PartialPath partialPath) {
- this.partialPath = partialPath;
+ public GetSeriesSlotListPlan(String storageGroup, TConsensusGroupType
partitionType) {
+ this();
+ this.storageGroup = storageGroup;
+ this.partitionType = partitionType;
}
- public int getLevel() {
- return level;
+ public String getStorageGroup() {
+ return storageGroup;
}
- public void setLevel(int level) {
- this.level = level;
+ public TConsensusGroupType getPartitionType() {
+ return partitionType;
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
- partialPath.serialize(stream);
- stream.writeInt(level);
+ stream.writeInt(getType().ordinal());
+ ReadWriteIOUtils.write(storageGroup, stream);
+ stream.writeInt(partitionType.ordinal());
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
- partialPath = (PartialPath) PathDeserializeUtil.deserialize(buffer);
- level = buffer.getInt();
+ this.storageGroup = ReadWriteIOUtils.readString(buffer);
+ this.partitionType = TConsensusGroupType.findByValue(buffer.getInt());
}
@Override
public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- GetNodePathsPartitionPlan that = (GetNodePathsPartitionPlan) o;
- return level == that.level && Objects.equals(partialPath,
that.partialPath);
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ GetSeriesSlotListPlan that = (GetSeriesSlotListPlan) o;
+ return storageGroup.equals(that.storageGroup) &&
partitionType.equals(that.partitionType);
}
@Override
public int hashCode() {
- return Objects.hash(partialPath, level);
+ int hashcode = 1;
+ hashcode = hashcode * 31 + Objects.hash(storageGroup);
+ hashcode = hashcode * 31 + partitionType.ordinal();
+ return hashcode;
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTimeSlotListPlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTimeSlotListPlan.java
new file mode 100644
index 0000000000..99e8603253
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTimeSlotListPlan.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.read;
+
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class GetTimeSlotListPlan extends ConfigPhysicalPlan {
+
+ private String storageGroup;
+
+ private TSeriesPartitionSlot seriesSlotId;
+
+ private long startTime;
+
+ private long endTime;
+
+ public GetTimeSlotListPlan() {
+ super(ConfigPhysicalPlanType.GetTimeSlotList);
+ }
+
+ public GetTimeSlotListPlan(
+ String storageGroup, TSeriesPartitionSlot seriesSlotId, long startTime,
long endTime) {
+ this();
+ this.storageGroup = storageGroup;
+ this.seriesSlotId = seriesSlotId;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+
+ public String getStorageGroup() {
+ return storageGroup;
+ }
+
+ public TSeriesPartitionSlot getSeriesSlotId() {
+ return seriesSlotId;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeInt(getType().ordinal());
+ ReadWriteIOUtils.write(storageGroup, stream);
+ ThriftCommonsSerDeUtils.serializeTSeriesPartitionSlot(seriesSlotId,
stream);
+ stream.writeLong(startTime);
+ stream.writeLong(endTime);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ this.storageGroup = ReadWriteIOUtils.readString(buffer);
+ this.seriesSlotId =
ThriftCommonsSerDeUtils.deserializeTSeriesPartitionSlot(buffer);
+ this.startTime = buffer.getLong();
+ this.endTime = buffer.getLong();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ GetTimeSlotListPlan that = (GetTimeSlotListPlan) o;
+ return storageGroup.equals(that.storageGroup)
+ && seriesSlotId.equals(that.seriesSlotId)
+ && startTime == that.startTime
+ && endTime == that.endTime;
+ }
+
+ @Override
+ public int hashCode() {
+ int hashcode = 1;
+ hashcode = hashcode * 31 + Objects.hash(storageGroup);
+ hashcode = hashcode * 31 + seriesSlotId.hashCode();
+ hashcode = hashcode * 31 + (int) startTime;
+ hashcode = hashcode * 31 + (int) endTime;
+ return hashcode;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionResp.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/GetRoutingResp.java
similarity index 50%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionResp.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/GetRoutingResp.java
index 91ee87ead0..cf1819c816 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionResp.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/GetRoutingResp.java
@@ -21,32 +21,21 @@ package org.apache.iotdb.confignode.consensus.response;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.commons.partition.SchemaPartitionTable;
-import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRoutingResp;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.List;
-public class SchemaPartitionResp implements DataSet {
+public class GetRoutingResp implements DataSet {
private TSStatus status;
- private final boolean allPartitionsExist;
+ private final List<TConsensusGroupId> dataRegionIdList;
- // Map<StorageGroup, SchemaPartitionTable>
- // TODO: Replace this map whit new SchemaPartition
- private final Map<String, SchemaPartitionTable> schemaPartition;
-
- public SchemaPartitionResp(
- TSStatus status,
- boolean allPartitionsExist,
- Map<String, SchemaPartitionTable> schemaPartition) {
+ public GetRoutingResp(TSStatus status, List<TConsensusGroupId>
dataRegionIdList) {
this.status = status;
- this.allPartitionsExist = allPartitionsExist;
- this.schemaPartition = schemaPartition;
+ this.dataRegionIdList = dataRegionIdList;
}
public TSStatus getStatus() {
@@ -57,23 +46,12 @@ public class SchemaPartitionResp implements DataSet {
this.status = status;
}
- public boolean isAllPartitionsExist() {
- return allPartitionsExist;
- }
-
- public TSchemaPartitionTableResp convertToRpcSchemaPartitionTableResp() {
- TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp();
+ public TGetRoutingResp convertToRpcGetRoutingResp() {
+ TGetRoutingResp resp = new TGetRoutingResp();
resp.setStatus(status);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>>
schemaPartitionMap =
- new ConcurrentHashMap<>();
-
- schemaPartition.forEach(
- (storageGroup, schemaPartitionTable) ->
- schemaPartitionMap.put(storageGroup,
schemaPartitionTable.getSchemaPartitionMap()));
-
- resp.setSchemaPartitionTable(schemaPartitionMap);
+ resp.dataRegionIdList = dataRegionIdList;
}
return resp;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionResp.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/GetSeriesSlotListResp.java
similarity index 50%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionResp.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/GetSeriesSlotListResp.java
index 91ee87ead0..e094732319 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionResp.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/GetSeriesSlotListResp.java
@@ -19,34 +19,23 @@
package org.apache.iotdb.confignode.consensus.response;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.commons.partition.SchemaPartitionTable;
-import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.List;
-public class SchemaPartitionResp implements DataSet {
+public class GetSeriesSlotListResp implements DataSet {
private TSStatus status;
- private final boolean allPartitionsExist;
+ private final List<TSeriesPartitionSlot> seriesSlotList;
- // Map<StorageGroup, SchemaPartitionTable>
- // TODO: Replace this map whit new SchemaPartition
- private final Map<String, SchemaPartitionTable> schemaPartition;
-
- public SchemaPartitionResp(
- TSStatus status,
- boolean allPartitionsExist,
- Map<String, SchemaPartitionTable> schemaPartition) {
+ public GetSeriesSlotListResp(TSStatus status, List<TSeriesPartitionSlot>
seriesSlotList) {
this.status = status;
- this.allPartitionsExist = allPartitionsExist;
- this.schemaPartition = schemaPartition;
+ this.seriesSlotList = seriesSlotList;
}
public TSStatus getStatus() {
@@ -57,23 +46,12 @@ public class SchemaPartitionResp implements DataSet {
this.status = status;
}
- public boolean isAllPartitionsExist() {
- return allPartitionsExist;
- }
-
- public TSchemaPartitionTableResp convertToRpcSchemaPartitionTableResp() {
- TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp();
+ public TGetSeriesSlotListResp convertToRpcGetSeriesSlotListResp() {
+ TGetSeriesSlotListResp resp = new TGetSeriesSlotListResp();
resp.setStatus(status);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>>
schemaPartitionMap =
- new ConcurrentHashMap<>();
-
- schemaPartition.forEach(
- (storageGroup, schemaPartitionTable) ->
- schemaPartitionMap.put(storageGroup,
schemaPartitionTable.getSchemaPartitionMap()));
-
- resp.setSchemaPartitionTable(schemaPartitionMap);
+ resp.seriesSlotList = seriesSlotList;
}
return resp;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/GetTimeSlotListResp.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/GetTimeSlotListResp.java
new file mode 100644
index 0000000000..e6230ca602
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/GetTimeSlotListResp.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import java.util.List;
+
+public class GetTimeSlotListResp implements DataSet {
+
+ private TSStatus status;
+
+ private final List<TTimePartitionSlot> timeSlotList;
+
+ public GetTimeSlotListResp(TSStatus status, List<TTimePartitionSlot>
timeSlotList) {
+ this.status = status;
+ this.timeSlotList = timeSlotList;
+ }
+
+ public TSStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(TSStatus status) {
+ this.status = status;
+ }
+
+ public TGetTimeSlotListResp convertToRpcGetTimeSlotListResp() {
+ TGetTimeSlotListResp resp = new TGetTimeSlotListResp();
+ resp.setStatus(status);
+
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ resp.timeSlotList = timeSlotList;
+ }
+
+ return resp;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionResp.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionResp.java
index 91ee87ead0..d08855a3fc 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionResp.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionResp.java
@@ -37,7 +37,7 @@ public class SchemaPartitionResp implements DataSet {
private final boolean allPartitionsExist;
// Map<StorageGroup, SchemaPartitionTable>
- // TODO: Replace this map whit new SchemaPartition
+ // TODO: Replace this map with new SchemaPartition
private final Map<String, SchemaPartitionTable> schemaPartition;
public SchemaPartitionResp(
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 445d69fd04..2e4729cbc0 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.utils.AuthUtils;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
@@ -46,8 +47,11 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetNodePathsPartitionP
import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetRoutingPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
+import
org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
import
org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
@@ -92,7 +96,10 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRoutingResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
@@ -206,11 +213,6 @@ public class ConfigManager implements IManager {
procedureManager.shiftExecutor(false);
}
- @Override
- public boolean isStopped() {
- return false;
- }
-
@Override
public DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan) {
TSStatus status = confirmLeader();
@@ -485,8 +487,7 @@ public class ConfigManager implements IManager {
new GetOrCreateSchemaPartitionPlan(partitionSlotsMap);
SchemaPartitionResp queryResult =
- (SchemaPartitionResp)
-
partitionManager.getOrCreateSchemaPartition(getOrCreateSchemaPartitionPlan);
+
partitionManager.getOrCreateSchemaPartition(getOrCreateSchemaPartitionPlan);
resp = queryResult.convertToRpcSchemaPartitionTableResp();
// TODO: Delete or hide this LOGGER before officially release.
@@ -508,8 +509,7 @@ public class ConfigManager implements IManager {
getNodePathsPartitionPlan.setLevel(level);
}
SchemaNodeManagementResp resp =
- (SchemaNodeManagementResp)
-
partitionManager.getNodePathsPartition(getNodePathsPartitionPlan);
+ partitionManager.getNodePathsPartition(getNodePathsPartitionPlan);
TSchemaNodeManagementResp result =
resp.convertToRpcSchemaNodeManagementPartitionResp(
getLoadManager().genLatestRegionRouteMap());
@@ -562,7 +562,7 @@ public class ConfigManager implements IManager {
}
DataPartitionResp queryResult =
- (DataPartitionResp)
partitionManager.getOrCreateDataPartition(getOrCreateDataPartitionReq);
+ partitionManager.getOrCreateDataPartition(getOrCreateDataPartitionReq);
resp = queryResult.convertToTDataPartitionTableResp();
@@ -847,7 +847,7 @@ public class ConfigManager implements IManager {
public RegionInfoListResp showRegion(GetRegionInfoListPlan
getRegionInfoListPlan) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return (RegionInfoListResp)
partitionManager.getRegionInfoList(getRegionInfoListPlan);
+ return partitionManager.getRegionInfoList(getRegionInfoListPlan);
} else {
RegionInfoListResp regionResp = new RegionInfoListResp();
regionResp.setStatus(status);
@@ -1010,7 +1010,36 @@ public class ConfigManager implements IManager {
}
}
- /** Get all related schemaRegion which may contains the timeseries matched
by given patternTree */
+ @Override
+ @TestOnly
+ public TGetRoutingResp getRouting(GetRoutingPlan plan) {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? partitionManager.getRouting(plan).convertToRpcGetRoutingResp()
+ : new TGetRoutingResp(status);
+ }
+
+ @Override
+ @TestOnly
+ public TGetTimeSlotListResp getTimeSlotList(GetTimeSlotListPlan plan) {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ?
partitionManager.getTimeSlotList(plan).convertToRpcGetTimeSlotListResp()
+ : new TGetTimeSlotListResp(status);
+ }
+
+ @Override
+ @TestOnly
+ public TGetSeriesSlotListResp getSeriesSlotList(GetSeriesSlotListPlan plan) {
+ TSStatus status = confirmLeader();
+ TGetSeriesSlotListResp resp = new TGetSeriesSlotListResp();
+ resp.setStatus(status);
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ?
partitionManager.getSeriesSlotList(plan).convertToRpcGetSeriesSlotListResp()
+ : new TGetSeriesSlotListResp(status);
+ }
+
+ /** Get all related schemaRegion which may contains the timeSeries matched
by given patternTree */
public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedSchemaRegionGroup(
PathPatternTree patternTree) {
Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>>
schemaPartitionTable =
@@ -1036,7 +1065,7 @@ public class ConfigManager implements IManager {
*/
public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedDataRegionGroup(
PathPatternTree patternTree) {
- // get all storage group and slot by getting schema partition
+ // get all storage groups and slots by getting schema partition
Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>>
schemaPartitionTable =
getSchemaPartition(patternTree).getSchemaPartitionTable();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 64519e0770..d028f3425c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -29,7 +29,10 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurati
import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetRoutingPlan;
+import
org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
import
org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
@@ -53,7 +56,10 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRoutingResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
@@ -75,13 +81,6 @@ import java.util.List;
*/
public interface IManager {
- /**
- * if a service stop
- *
- * @return true if service stopped
- */
- boolean isStopped();
-
/**
* Get DataManager
*
@@ -406,4 +405,10 @@ public interface IManager {
* @return TGetPipeSinkResp contains the PipeSink
*/
TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req);
+
+ TGetRoutingResp getRouting(GetRoutingPlan plan);
+
+ TGetTimeSlotListResp getTimeSlotList(GetTimeSlotListPlan plan);
+
+ TGetSeriesSlotListResp getSeriesSlotList(GetSeriesSlotListPlan plan);
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 0c04b34b0d..8a666f53b8 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -41,7 +41,10 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetNodePathsPartitionP
import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetRoutingPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
+import
org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
import
org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
@@ -49,6 +52,9 @@ import
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGr
import
org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan;
import
org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
+import org.apache.iotdb.confignode.consensus.response.GetRoutingResp;
+import org.apache.iotdb.confignode.consensus.response.GetSeriesSlotListResp;
+import org.apache.iotdb.confignode.consensus.response.GetTimeSlotListResp;
import org.apache.iotdb.confignode.consensus.response.RegionInfoListResp;
import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
@@ -162,7 +168,7 @@ public class PartitionManager {
* finish. NOT_ENOUGH_DATA_NODE if the DataNodes is not enough to create
new Regions.
* STORAGE_GROUP_NOT_EXIST if some StorageGroup don't exist.
*/
- public DataSet getOrCreateSchemaPartition(GetOrCreateSchemaPartitionPlan
req) {
+ public SchemaPartitionResp
getOrCreateSchemaPartition(GetOrCreateSchemaPartitionPlan req) {
// After all the SchemaPartitions are allocated,
// all the read requests about SchemaPartitionTable are parallel.
SchemaPartitionResp resp = (SchemaPartitionResp) getSchemaPartition(req);
@@ -226,7 +232,7 @@ public class PartitionManager {
}
}
- return getSchemaPartition(req);
+ return (SchemaPartitionResp) getSchemaPartition(req);
}
/**
@@ -238,7 +244,7 @@ public class PartitionManager {
* finish. NOT_ENOUGH_DATA_NODE if the DataNodes is not enough to create
new Regions.
* STORAGE_GROUP_NOT_EXIST if some StorageGroup don't exist.
*/
- public DataSet getOrCreateDataPartition(GetOrCreateDataPartitionPlan req) {
+ public DataPartitionResp
getOrCreateDataPartition(GetOrCreateDataPartitionPlan req) {
// After all the DataPartitions are allocated,
// all the read requests about DataPartitionTable are parallel.
DataPartitionResp resp = (DataPartitionResp) getDataPartition(req);
@@ -303,7 +309,7 @@ public class PartitionManager {
}
}
- return getDataPartition(req);
+ return (DataPartitionResp) getDataPartition(req);
}
// ======================================================
@@ -515,7 +521,7 @@ public class PartitionManager {
* @return SchemaNodeManagementPartitionDataSet that contains only existing
matched
* SchemaPartition and matched child paths aboveMTree
*/
- public DataSet getNodePathsPartition(GetNodePathsPartitionPlan physicalPlan)
{
+ public SchemaNodeManagementResp
getNodePathsPartition(GetNodePathsPartitionPlan physicalPlan) {
SchemaNodeManagementResp schemaNodeManagementResp;
ConsensusReadResponse consensusReadResponse =
getConsensusManager().read(physicalPlan);
schemaNodeManagementResp = (SchemaNodeManagementResp)
consensusReadResponse.getDataset();
@@ -543,7 +549,7 @@ public class PartitionManager {
return executor.getSeriesPartitionSlot(devicePath);
}
- public DataSet getRegionInfoList(GetRegionInfoListPlan req) {
+ public RegionInfoListResp getRegionInfoList(GetRegionInfoListPlan req) {
// Get static result
RegionInfoListResp regionInfoListResp =
(RegionInfoListResp) getConsensusManager().read(req).getDataset();
@@ -586,6 +592,17 @@ public class PartitionManager {
return getConsensusManager().write(req).getStatus();
}
+ public GetRoutingResp getRouting(GetRoutingPlan plan) {
+ return (GetRoutingResp) getConsensusManager().read(plan).getDataset();
+ }
+
+ public GetTimeSlotListResp getTimeSlotList(GetTimeSlotListPlan plan) {
+ return (GetTimeSlotListResp) getConsensusManager().read(plan).getDataset();
+ }
+
+ public GetSeriesSlotListResp getSeriesSlotList(GetSeriesSlotListPlan plan) {
+ return (GetSeriesSlotListResp)
getConsensusManager().read(plan).getDataset();
+ }
/**
* get storage group for region
*
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 4fc66d6a81..64b488cdca 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -30,8 +30,11 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurati
import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetNodePathsPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetRoutingPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
+import
org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
import
org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
import
org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
import
org.apache.iotdb.confignode.consensus.request.read.template.GetSchemaTemplatePlan;
@@ -172,6 +175,12 @@ public class ConfigPlanExecutor {
return syncInfo.getPipeSink((GetPipeSinkPlan) req);
case GetTriggerTable:
return triggerInfo.getTriggerTable();
+ case GetRouting:
+ return partitionInfo.getRouting((GetRoutingPlan) req);
+ case GetTimeSlotList:
+ return partitionInfo.getTimeSlotList((GetTimeSlotListPlan) req);
+ case GetSeriesSlotList:
+ return partitionInfo.getSeriesSlotList((GetSeriesSlotListPlan) req);
default:
throw new UnknownPhysicalPlanTypeException(req.getType());
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index c94a90358c..93d637cf89 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -31,7 +31,10 @@ import
org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetRoutingPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
+import
org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
import
org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
@@ -41,6 +44,9 @@ import
org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteSt
import
org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteStorageGroupPlan;
import
org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
+import org.apache.iotdb.confignode.consensus.response.GetRoutingResp;
+import org.apache.iotdb.confignode.consensus.response.GetSeriesSlotListResp;
+import org.apache.iotdb.confignode.consensus.response.GetTimeSlotListResp;
import org.apache.iotdb.confignode.consensus.response.RegionInfoListResp;
import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
@@ -249,7 +255,7 @@ public class PartitionInfo implements SnapshotProcessor {
*/
public DataSet getSchemaPartition(GetSchemaPartitionPlan plan) {
AtomicBoolean isAllPartitionsExist = new AtomicBoolean(true);
- // TODO: Replace this map whit new SchemaPartition
+ // TODO: Replace this map with new SchemaPartition
Map<String, SchemaPartitionTable> schemaPartition = new
ConcurrentHashMap<>();
if (plan.getPartitionSlotsMap().size() == 0) {
@@ -770,6 +776,43 @@ public class PartitionInfo implements SnapshotProcessor {
}
}
+ public DataSet getRouting(GetRoutingPlan plan) {
+ if (!isStorageGroupExisted(plan.getStorageGroup())) {
+ return new GetRoutingResp(
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), new
ArrayList<>());
+ }
+ StorageGroupPartitionTable sgPartitionTable =
+ storageGroupPartitionTables.get(plan.getStorageGroup());
+ return new GetRoutingResp(
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+ sgPartitionTable.getRouting(plan.getSeriesSlotId(),
plan.getTimeSlotId()));
+ }
+
+ public DataSet getTimeSlotList(GetTimeSlotListPlan plan) {
+ if (!isStorageGroupExisted(plan.getStorageGroup())) {
+ return new GetTimeSlotListResp(
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), new
ArrayList<>());
+ }
+ StorageGroupPartitionTable sgPartitionTable =
+ storageGroupPartitionTables.get(plan.getStorageGroup());
+ return new GetTimeSlotListResp(
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+ sgPartitionTable.getTimeSlotList(
+ plan.getSeriesSlotId(), plan.getStartTime(), plan.getEndTime()));
+ }
+
+ public DataSet getSeriesSlotList(GetSeriesSlotListPlan plan) {
+ if (!isStorageGroupExisted(plan.getStorageGroup())) {
+ return new GetSeriesSlotListResp(
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), new
ArrayList<>());
+ }
+ StorageGroupPartitionTable sgPartitionTable =
+ storageGroupPartitionTables.get(plan.getStorageGroup());
+ return new GetSeriesSlotListResp(
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+ sgPartitionTable.getSeriesSlotList(plan.getPartitionType()));
+ }
+
public int getStorageGroupPartitionTableSize() {
return storageGroupPartitionTables.size();
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index 5ba83457c8..3a7fafb2cb 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -50,6 +50,8 @@ import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class StorageGroupPartitionTable {
private static final Logger LOGGER =
LoggerFactory.getLogger(StorageGroupPartitionTable.class);
@@ -366,6 +368,31 @@ public class StorageGroupPartitionTable {
dataPartitionTable.deserialize(inputStream, protocol);
}
+ public List<TConsensusGroupId> getRouting(
+ TSeriesPartitionSlot seriesSlotId, TTimePartitionSlot timeSlotId) {
+ return dataPartitionTable.getRouting(seriesSlotId, timeSlotId);
+ }
+
+ public List<TTimePartitionSlot> getTimeSlotList(
+ TSeriesPartitionSlot seriesSlotId, long startTime, long endTime) {
+ return dataPartitionTable.getTimeSlotList(seriesSlotId, startTime,
endTime);
+ }
+
+ public List<TSeriesPartitionSlot> getSeriesSlotList(TConsensusGroupType
type) {
+ switch (type) {
+ case DataRegion:
+ return dataPartitionTable.getSeriesSlotList();
+ case SchemaRegion:
+ return schemaPartitionTable.getSeriesSlotList();
+ case PartitionRegion:
+ default:
+ return Stream.concat(
+ schemaPartitionTable.getSeriesSlotList().stream(),
+ dataPartitionTable.getSeriesSlotList().stream())
+ .distinct()
+ .collect(Collectors.toList());
+ }
+ }
/**
* update region location
*
@@ -379,33 +406,33 @@ public class StorageGroupPartitionTable {
removeRegionOldLocation(regionId, oldNode);
}
- private boolean addRegionNewLocation(TConsensusGroupId regionId,
TDataNodeLocation node) {
+ private void addRegionNewLocation(TConsensusGroupId regionId,
TDataNodeLocation node) {
RegionGroup regionGroup = regionGroupMap.get(regionId);
if (regionGroup == null) {
LOGGER.warn("not find Region Group for region {}", regionId);
- return false;
+ return;
}
if (regionGroup.getReplicaSet().getDataNodeLocations().contains(node)) {
LOGGER.info("Node is already in region locations, node: {}, region: {}",
node, regionId);
- return true;
+ return;
}
- return regionGroup.getReplicaSet().getDataNodeLocations().add(node);
+ regionGroup.getReplicaSet().getDataNodeLocations().add(node);
}
- private boolean removeRegionOldLocation(TConsensusGroupId regionId,
TDataNodeLocation node) {
+ private void removeRegionOldLocation(TConsensusGroupId regionId,
TDataNodeLocation node) {
RegionGroup regionGroup = regionGroupMap.get(regionId);
if (regionGroup == null) {
LOGGER.warn("not find Region Group for region {}", regionId);
- return false;
+ return;
}
if (!regionGroup.getReplicaSet().getDataNodeLocations().contains(node)) {
LOGGER.info(
"Node is Not in region locations, no need to remove it, node: {},
region: {}",
node,
regionId);
- return true;
+ return;
}
- return regionGroup.getReplicaSet().getDataNodeLocations().remove(node);
+ regionGroup.getReplicaSet().getDataNodeLocations().remove(node);
}
/**
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index d71919e555..5638835c5c 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.service.thrift;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
@@ -39,7 +40,10 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurati
import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetRoutingPlan;
+import
org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
import
org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
@@ -87,7 +91,13 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRoutingReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRoutingResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -148,7 +158,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req)
throws TException {
+ public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) {
TDataNodeRegisterResp resp =
((DataNodeRegisterResp)
configManager.registerDataNode(
@@ -162,7 +172,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TDataNodeRemoveResp removeDataNode(TDataNodeRemoveReq req) throws
TException {
+ public TDataNodeRemoveResp removeDataNode(TDataNodeRemoveReq req) {
LOGGER.info("ConfigNode RPC Service start to remove DataNode, req: {}",
req);
RemoveDataNodePlan removeDataNodePlan = new
RemoveDataNodePlan(req.getDataNodeLocations());
DataNodeToStatusResp removeResp =
@@ -174,7 +184,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TDataNodeConfigurationResp getDataNodeConfiguration(int dataNodeID)
throws TException {
+ public TDataNodeConfigurationResp getDataNodeConfiguration(int dataNodeID) {
GetDataNodeConfigurationPlan queryReq = new
GetDataNodeConfigurationPlan(dataNodeID);
DataNodeConfigurationResp queryResp =
(DataNodeConfigurationResp)
configManager.getDataNodeConfiguration(queryReq);
@@ -185,12 +195,12 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req)
throws TException {
+ public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req)
{
return configManager.reportRegionMigrateResult(req);
}
@Override
- public TShowClusterResp showCluster() throws TException {
+ public TShowClusterResp showCluster() {
return configManager.showCluster();
}
@@ -229,13 +239,13 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TSStatus deleteStorageGroup(TDeleteStorageGroupReq tDeleteReq) throws
TException {
+ public TSStatus deleteStorageGroup(TDeleteStorageGroupReq tDeleteReq) {
String prefixPath = tDeleteReq.getPrefixPath();
return
configManager.deleteStorageGroups(Collections.singletonList(prefixPath));
}
@Override
- public TSStatus deleteStorageGroups(TDeleteStorageGroupsReq tDeleteReq)
throws TException {
+ public TSStatus deleteStorageGroups(TDeleteStorageGroupsReq tDeleteReq) {
List<String> prefixList = tDeleteReq.getPrefixPathList();
return configManager.deleteStorageGroups(prefixList);
}
@@ -265,8 +275,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TCountStorageGroupResp countMatchedStorageGroups(List<String>
storageGroupPathPattern)
- throws TException {
+ public TCountStorageGroupResp countMatchedStorageGroups(List<String>
storageGroupPathPattern) {
CountStorageGroupResp countStorageGroupResp =
(CountStorageGroupResp)
configManager.countMatchedStorageGroups(
@@ -278,8 +287,8 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TStorageGroupSchemaResp getMatchedStorageGroupSchemas(List<String>
storageGroupPathPattern)
- throws TException {
+ public TStorageGroupSchemaResp getMatchedStorageGroupSchemas(
+ List<String> storageGroupPathPattern) {
StorageGroupSchemaResp storageGroupSchemaResp =
(StorageGroupSchemaResp)
configManager.getMatchedStorageGroupSchemas(
@@ -289,24 +298,21 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TSchemaPartitionTableResp getSchemaPartitionTable(TSchemaPartitionReq
req)
- throws TException {
+ public TSchemaPartitionTableResp getSchemaPartitionTable(TSchemaPartitionReq
req) {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
return configManager.getSchemaPartition(patternTree);
}
@Override
- public TSchemaPartitionTableResp
getOrCreateSchemaPartitionTable(TSchemaPartitionReq req)
- throws TException {
+ public TSchemaPartitionTableResp
getOrCreateSchemaPartitionTable(TSchemaPartitionReq req) {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
return configManager.getOrCreateSchemaPartition(patternTree);
}
@Override
- public TSchemaNodeManagementResp
getSchemaNodeManagementPartition(TSchemaNodeManagementReq req)
- throws TException {
+ public TSchemaNodeManagementResp
getSchemaNodeManagementPartition(TSchemaNodeManagementReq req) {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
PartialPath partialPath = patternTree.getAllPathPatterns().get(0);
@@ -314,7 +320,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TDataPartitionTableResp getDataPartitionTable(TDataPartitionReq req)
throws TException {
+ public TDataPartitionTableResp getDataPartitionTable(TDataPartitionReq req) {
GetDataPartitionPlan getDataPartitionPlan =
GetDataPartitionPlan.convertFromRpcTDataPartitionReq(req);
return configManager.getDataPartition(getDataPartitionPlan);
@@ -329,7 +335,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TSStatus operatePermission(TAuthorizerReq req) throws TException {
+ public TSStatus operatePermission(TAuthorizerReq req) {
if (req.getAuthorType() < 0
|| req.getAuthorType() >= AuthorOperator.AuthorType.values().length) {
throw new IndexOutOfBoundsException("Invalid Author Type ordinal");
@@ -353,7 +359,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TAuthorizerResp queryPermission(TAuthorizerReq req) throws TException
{
+ public TAuthorizerResp queryPermission(TAuthorizerReq req) {
if (req.getAuthorType() < 0
|| req.getAuthorType() >= AuthorOperator.AuthorType.values().length) {
throw new IndexOutOfBoundsException("Invalid Author Type ordinal");
@@ -380,18 +386,18 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TPermissionInfoResp login(TLoginReq req) throws TException {
+ public TPermissionInfoResp login(TLoginReq req) {
return configManager.login(req.getUserrname(), req.getPassword());
}
@Override
- public TPermissionInfoResp checkUserPrivileges(TCheckUserPrivilegesReq req)
throws TException {
+ public TPermissionInfoResp checkUserPrivileges(TCheckUserPrivilegesReq req) {
return configManager.checkUserPrivileges(
req.getUsername(), req.getPaths(), req.getPermission());
}
@Override
- public TSStatus registerConfigNode(TConfigNodeRegisterReq req) throws
TException {
+ public TSStatus registerConfigNode(TConfigNodeRegisterReq req) {
TSStatus status = configManager.registerConfigNode(req);
// Print log to record the ConfigNode that performs the
RegisterConfigNodeRequest
@@ -406,7 +412,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TSStatus notifyRegisterSuccess() throws TException {
+ public TSStatus notifyRegisterSuccess() {
try {
SystemPropertiesUtils.storeSystemParameters();
} catch (IOException e) {
@@ -432,7 +438,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TSStatus removeConsensusGroup(TConfigNodeLocation configNodeLocation)
throws TException {
+ public TSStatus removeConsensusGroup(TConfigNodeLocation configNodeLocation)
{
if
(!configManager.getNodeManager().getRegisteredConfigNodes().contains(configNodeLocation))
{
return new
TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
.setMessage(
@@ -475,21 +481,21 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TSStatus dropFunction(TDropFunctionReq req) throws TException {
+ public TSStatus dropFunction(TDropFunctionReq req) {
return configManager.dropFunction(req.getUdfName());
}
@Override
- public TSStatus createTrigger(TCreateTriggerReq req) throws TException {
+ public TSStatus createTrigger(TCreateTriggerReq req) {
return configManager.createTrigger(req);
}
@Override
- public TSStatus dropTrigger(TDropTriggerReq req) throws TException {
+ public TSStatus dropTrigger(TDropTriggerReq req) {
return configManager.dropTrigger(req);
}
- public TGetTriggerTableResp getTriggerTable() throws TException {
+ public TGetTriggerTableResp getTriggerTable() {
return configManager.getTriggerTable();
}
@@ -515,7 +521,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TSStatus clearCache() throws TException {
+ public TSStatus clearCache() {
return configManager.clearCache();
}
@@ -530,7 +536,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TShowRegionResp showRegion(TShowRegionReq showRegionReq) throws
TException {
+ public TShowRegionResp showRegion(TShowRegionReq showRegionReq) {
GetRegionInfoListPlan getRegionInfoListPlan = new
GetRegionInfoListPlan(showRegionReq);
RegionInfoListResp dataSet =
configManager.showRegion(getRegionInfoListPlan);
TShowRegionResp showRegionResp = new TShowRegionResp();
@@ -540,7 +546,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TRegionRouteMapResp getLatestRegionRouteMap() throws TException {
+ public TRegionRouteMapResp getLatestRegionRouteMap() {
TRegionRouteMapResp resp = configManager.getLatestRegionRouteMap();
if (resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LoadManager.printRegionRouteMap(resp.getTimestamp(),
resp.getRegionRouteMap());
@@ -549,17 +555,17 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public long getConfigNodeHeartBeat(long timestamp) throws TException {
+ public long getConfigNodeHeartBeat(long timestamp) {
return timestamp;
}
@Override
- public TShowDataNodesResp showDataNodes() throws TException {
+ public TShowDataNodesResp showDataNodes() {
return configManager.showDataNodes();
}
@Override
- public TShowConfigNodesResp showConfigNodes() throws TException {
+ public TShowConfigNodesResp showConfigNodes() {
return configManager.showConfigNodes();
}
@@ -570,47 +576,74 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TSStatus createSchemaTemplate(TCreateSchemaTemplateReq req) throws
TException {
+ public TSStatus createSchemaTemplate(TCreateSchemaTemplateReq req) {
return configManager.createSchemaTemplate(req);
}
@Override
- public TGetAllTemplatesResp getAllTemplates() throws TException {
+ public TGetAllTemplatesResp getAllTemplates() {
return configManager.getAllTemplates();
}
@Override
- public TGetTemplateResp getTemplate(String req) throws TException {
+ public TGetTemplateResp getTemplate(String req) {
return configManager.getTemplate(req);
}
@Override
- public TSStatus setSchemaTemplate(TSetSchemaTemplateReq req) throws
TException {
+ public TSStatus setSchemaTemplate(TSetSchemaTemplateReq req) {
return configManager.setSchemaTemplate(req);
}
@Override
- public TGetPathsSetTemplatesResp getPathsSetTemplate(String req) throws
TException {
+ public TGetPathsSetTemplatesResp getPathsSetTemplate(String req) {
return configManager.getPathsSetTemplate(req);
}
@Override
- public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) throws TException
{
+ public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
return configManager.deleteTimeSeries(req);
}
@Override
- public TSStatus createPipeSink(TPipeSinkInfo req) throws TException {
+ public TSStatus createPipeSink(TPipeSinkInfo req) {
return configManager.createPipeSink(new CreatePipeSinkPlan(req));
}
@Override
- public TSStatus dropPipeSink(TDropPipeSinkReq req) throws TException {
+ public TSStatus dropPipeSink(TDropPipeSinkReq req) {
return configManager.dropPipeSink(new
DropPipeSinkPlan(req.getPipeSinkName()));
}
@Override
- public TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req) throws TException {
+ public TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req) {
return configManager.getPipeSink(req);
}
+
+ @Override
+ @TestOnly
+ public TGetRoutingResp getRouting(TGetRoutingReq req) {
+ GetRoutingPlan plan =
+ new GetRoutingPlan(req.getStorageGroup(), req.getSeriesSlotId(),
req.getTimeSlotId());
+ return configManager.getRouting(plan);
+ }
+
+ @Override
+ @TestOnly
+ public TGetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) {
+ long startTime = req.isSetStartTime() ? req.getStartTime() :
Long.MIN_VALUE;
+ long endTime = req.isSetEndTime() ? req.getEndTime() : Long.MAX_VALUE;
+ GetTimeSlotListPlan plan =
+ new GetTimeSlotListPlan(req.getStorageGroup(), req.getSeriesSlotId(),
startTime, endTime);
+ return configManager.getTimeSlotList(plan);
+ }
+
+ @Override
+ @TestOnly
+ public TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) {
+ TConsensusGroupType type =
+ req.isSetType() ? req.getType() : TConsensusGroupType.PartitionRegion;
+ GetSeriesSlotListPlan plan = new
GetSeriesSlotListPlan(req.getStorageGroup(), type);
+ return configManager.getSeriesSlotList(plan);
+ }
}
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 054bf92aac..0ba97cb955 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -40,11 +40,15 @@ import
org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
import
org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurationPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
+import
org.apache.iotdb.confignode.consensus.request.read.GetNodePathsPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetRoutingPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
+import
org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
import
org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
import
org.apache.iotdb.confignode.consensus.request.read.template.GetAllTemplateSetInfoPlan;
@@ -737,7 +741,7 @@ public class ConfigPhysicalPlanSerDeTest {
}
@Test
- public void GetRegionLocaltionsPlanTest() throws IOException {
+ public void GetRegionLocationsPlanTest() throws IOException {
GetRegionInfoListPlan req0 = new GetRegionInfoListPlan();
TShowRegionReq showRegionReq = new TShowRegionReq();
req0.setShowRegionReq(showRegionReq);
@@ -811,7 +815,17 @@ public class ConfigPhysicalPlanSerDeTest {
}
@Test
- public void GetAllTemplateSetInfoPlan() throws IOException {
+ public void GetNodePathsPartitionPlanTest() throws IOException,
IllegalPathException {
+ GetNodePathsPartitionPlan getNodePathsPartitionPlan0 = new
GetNodePathsPartitionPlan();
+ getNodePathsPartitionPlan0.setPartialPath(new PartialPath("root.sg1.**"));
+ GetNodePathsPartitionPlan getNodePathsPartitionPlan1 =
+ (GetNodePathsPartitionPlan)
+
ConfigPhysicalPlan.Factory.create(getNodePathsPartitionPlan0.serializeToByteBuffer());
+ Assert.assertEquals(getNodePathsPartitionPlan0,
getNodePathsPartitionPlan1);
+ }
+
+ @Test
+ public void GetAllTemplateSetInfoPlanTest() throws IOException {
GetAllTemplateSetInfoPlan getAllTemplateSetInfoPlan = new
GetAllTemplateSetInfoPlan();
Assert.assertTrue(
ConfigPhysicalPlan.Factory.create(getAllTemplateSetInfoPlan.serializeToByteBuffer())
@@ -941,4 +955,33 @@ public class ConfigPhysicalPlanSerDeTest {
updateTriggerStateInTablePlan0.getTriggerState(),
updateTriggerStateInTablePlan1.getTriggerState());
}
+
+ @Test
+ public void GetRoutingPlanTest() throws IOException {
+ GetRoutingPlan getRoutingPlan0 =
+ new GetRoutingPlan("root.test", new TSeriesPartitionSlot(1), new
TTimePartitionSlot(0));
+ GetRoutingPlan getRoutingPlan1 =
+ (GetRoutingPlan)
ConfigPhysicalPlan.Factory.create(getRoutingPlan0.serializeToByteBuffer());
+ Assert.assertEquals(getRoutingPlan0, getRoutingPlan1);
+ }
+
+ @Test
+ public void GetTimeSlotListPlanTest() throws IOException {
+ GetTimeSlotListPlan getTimeSlotListPlan0 =
+ new GetTimeSlotListPlan("root.test", new TSeriesPartitionSlot(1), 0,
Long.MAX_VALUE);
+ GetTimeSlotListPlan getTimeSlotListPlan1 =
+ (GetTimeSlotListPlan)
+
ConfigPhysicalPlan.Factory.create(getTimeSlotListPlan0.serializeToByteBuffer());
+ Assert.assertEquals(getTimeSlotListPlan0, getTimeSlotListPlan1);
+ }
+
+ @Test
+ public void GetSeriesSlotListPlanTest() throws IOException {
+ GetSeriesSlotListPlan getSeriesSlotListPlan0 =
+ new GetSeriesSlotListPlan("root.test", SchemaRegion);
+ GetSeriesSlotListPlan getSeriesSlotListPlan1 =
+ (GetSeriesSlotListPlan)
+
ConfigPhysicalPlan.Factory.create(getSeriesSlotListPlan0.serializeToByteBuffer());
+ Assert.assertEquals(getSeriesSlotListPlan0, getSeriesSlotListPlan1);
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
index 3eda506ba9..e6a22a6c3d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
@@ -31,6 +32,12 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRoutingReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRoutingResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
@@ -572,4 +579,142 @@ public class IoTDBClusterPartitionIT {
Assert.assertTrue(allRunning);
}
}
+
+ @Test
+ public void testGetSlots()
+ throws TException, IOException, IllegalPathException,
InterruptedException {
+ final String sg = "root.sg";
+ final String sg0 = "root.sg0";
+ final String sg1 = "root.sg1";
+
+ final String d00 = sg0 + ".d0.s";
+ final String d01 = sg0 + ".d1.s";
+ final String d10 = sg1 + ".d0.s";
+ final String d11 = sg1 + ".d1.s";
+
+ final int seriesPartitionBatchSize = 100;
+ final int timePartitionBatchSize = 10;
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getConfigNodeConnection()) {
+ ByteBuffer buffer;
+ TSchemaPartitionReq schemaPartitionReq;
+
+ // We assert the correctness of setting storageGroups, dataPartitions,
schemaPartitions
+
+ // Set StorageGroups
+ client.setStorageGroup(new TSetStorageGroupReq(new
TStorageGroupSchema(sg0)));
+ client.setStorageGroup(new TSetStorageGroupReq(new
TStorageGroupSchema(sg1)));
+
+ // Create SchemaPartitions
+ buffer = generatePatternTreeBuffer(new String[] {d00, d01, d10, d11});
+ schemaPartitionReq = new TSchemaPartitionReq(buffer);
+ client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
+
+ TDataPartitionReq dataPartitionReq;
+ TDataPartitionTableResp dataPartitionTableResp;
+
+ // Prepare partitionSlotsMap
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
partitionSlotsMap;
+
+ // Create DataPartitions
+ for (int i = 0; i < 2; i++) {
+ String storageGroup = sg + i;
+ partitionSlotsMap =
+ constructPartitionSlotsMap(
+ storageGroup, 0, seriesPartitionBatchSize, 0,
timePartitionBatchSize);
+
+ dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
+ for (int retry = 0; retry < 5; retry++) {
+ // Build new Client since it's unstable
+ try (SyncConfigNodeIServiceClient configNodeClient =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getConfigNodeConnection()) {
+ dataPartitionTableResp =
+
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+ if (dataPartitionTableResp != null) {
+ break;
+ }
+ } catch (Exception e) {
+ // Retry sometimes in order to avoid request timeout
+ LOGGER.error(e.getMessage());
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ }
+
+ // Test getRouting api
+ TGetRoutingReq getRoutingReq;
+ TGetRoutingResp getRoutingResp;
+
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(0);
+ TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(0L);
+
+ getRoutingReq = new TGetRoutingReq(sg0, seriesPartitionSlot,
timePartitionSlot);
+ getRoutingResp = client.getRouting(getRoutingReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
getRoutingResp.status.getCode());
+ Assert.assertEquals(1, getRoutingResp.getDataRegionIdListSize());
+
+ // Test GetTimeSlotList api
+ TGetTimeSlotListReq getTimeSlotListReq;
+ TGetTimeSlotListResp getTimeSlotListResp;
+
+ seriesPartitionSlot.setSlotId(0);
+
+ getTimeSlotListReq = new TGetTimeSlotListReq(sg0, seriesPartitionSlot);
+ getTimeSlotListResp = client.getTimeSlotList(getTimeSlotListReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
getTimeSlotListResp.status.getCode());
+ Assert.assertEquals(timePartitionBatchSize,
getTimeSlotListResp.getTimeSlotListSize());
+
+ long startTime = 5;
+ getTimeSlotListReq.setStartTime(startTime * testTimePartitionInterval);
+
+ getTimeSlotListResp = client.getTimeSlotList(getTimeSlotListReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
getTimeSlotListResp.status.getCode());
+ Assert.assertEquals(
+ timePartitionBatchSize - startTime,
getTimeSlotListResp.getTimeSlotListSize());
+
+ long endTime = 6;
+ getTimeSlotListReq.setEndTime(endTime * testTimePartitionInterval);
+
+ getTimeSlotListResp = client.getTimeSlotList(getTimeSlotListReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
getTimeSlotListResp.status.getCode());
+ Assert.assertEquals(endTime - startTime,
getTimeSlotListResp.getTimeSlotListSize());
+
+ // Test GetSeriesSlotList api
+ TGetSeriesSlotListReq getSeriesSlotListReq;
+ TGetSeriesSlotListResp getSeriesSlotListResp;
+
+ getSeriesSlotListReq = new TGetSeriesSlotListReq(sg0);
+ getSeriesSlotListResp = client.getSeriesSlotList(getSeriesSlotListReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
getSeriesSlotListResp.status.getCode());
+ Assert.assertEquals(102, getSeriesSlotListResp.getSeriesSlotListSize());
+
+ getSeriesSlotListReq.setType(TConsensusGroupType.PartitionRegion);
+
+ getSeriesSlotListResp = client.getSeriesSlotList(getSeriesSlotListReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
getSeriesSlotListResp.status.getCode());
+ Assert.assertEquals(
+ seriesPartitionBatchSize + 2,
getSeriesSlotListResp.getSeriesSlotListSize());
+
+ getSeriesSlotListReq.setType(TConsensusGroupType.SchemaRegion);
+
+ getSeriesSlotListResp = client.getSeriesSlotList(getSeriesSlotListReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
getSeriesSlotListResp.status.getCode());
+ Assert.assertEquals(2, getSeriesSlotListResp.getSeriesSlotListSize());
+
+ getSeriesSlotListReq.setType(TConsensusGroupType.DataRegion);
+
+ getSeriesSlotListResp = client.getSeriesSlotList(getSeriesSlotListReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
getSeriesSlotListResp.status.getCode());
+ Assert.assertEquals(seriesPartitionBatchSize,
getSeriesSlotListResp.getSeriesSlotListSize());
+ }
+ }
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
index ff3a75feb2..a8e840d968 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
@@ -31,6 +31,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -160,18 +161,33 @@ public class DataPartitionTable {
return result;
}
- public static DataPartitionTable convertFromPlainMap(
- Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TConsensusGroupId>>>
- dataPartitionMap) {
- DataPartitionTable result = new DataPartitionTable();
+ /**
+ * Query a timePartition's corresponding dataRegionIds
+ *
+ * @param seriesSlotId SeriesPartitionSlot
+ * @param timeSlotId TimePartitionSlot
+ * @return the timePartition's corresponding dataRegionIds
+ */
+ public List<TConsensusGroupId> getRouting(
+ TSeriesPartitionSlot seriesSlotId, TTimePartitionSlot timeSlotId) {
+ if (!dataPartitionMap.containsKey(seriesSlotId)) {
+ return new ArrayList<>();
+ }
+ SeriesPartitionTable seriesPartitionTable =
dataPartitionMap.get(seriesSlotId);
+ return seriesPartitionTable.getRouting(timeSlotId);
+ }
- dataPartitionMap.forEach(
- (seriesPartitionSlot, seriesPartitionMap) ->
- result
- .getDataPartitionMap()
- .put(seriesPartitionSlot, new
SeriesPartitionTable(seriesPartitionMap)));
+ public List<TTimePartitionSlot> getTimeSlotList(
+ TSeriesPartitionSlot seriesSlotId, long startTime, long endTime) {
+ if (!dataPartitionMap.containsKey(seriesSlotId)) {
+ return new ArrayList<>();
+ }
+ SeriesPartitionTable seriesPartitionTable =
dataPartitionMap.get(seriesSlotId);
+ return seriesPartitionTable.getTimeSlotList(startTime, endTime);
+ }
- return result;
+ public List<TSeriesPartitionSlot> getSeriesSlotList() {
+ return new ArrayList<>(dataPartitionMap.keySet());
}
public void serialize(OutputStream outputStream, TProtocol protocol)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionTable.java
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionTable.java
index 61b9189e16..d736b0a4a2 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionTable.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionTable.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -131,6 +132,10 @@ public class SchemaPartitionTable {
return result;
}
+ public List<TSeriesPartitionSlot> getSeriesSlotList() {
+ return new ArrayList<>(schemaPartitionMap.keySet());
+ }
+
public void serialize(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
ReadWriteIOUtils.write(schemaPartitionMap.size(), outputStream);
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index ac599887c1..ca673c2f85 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -31,6 +31,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -39,6 +40,7 @@ import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
public class SeriesPartitionTable {
@@ -109,6 +111,25 @@ public class SeriesPartitionTable {
}
}
+ /**
+ * Query a timePartition's corresponding dataRegionIds
+ *
+ * @param timeSlotId Time partition's timeSlotId
+ * @return the timePartition's corresponding dataRegionIds
+ */
+ List<TConsensusGroupId> getRouting(TTimePartitionSlot timeSlotId) {
+ if (!seriesPartitionMap.containsKey(timeSlotId)) {
+ return new ArrayList<>();
+ }
+ return seriesPartitionMap.get(timeSlotId);
+ }
+
+ List<TTimePartitionSlot> getTimeSlotList(long startTime, long endTime) {
+ return seriesPartitionMap.keySet().stream()
+ .filter(e -> e.getStartTime() >= startTime && e.getStartTime() <
endTime)
+ .collect(Collectors.toList());
+ }
+
/**
* Create DataPartition within the specific SeriesPartitionSlot
*
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index 6d2bde0808..dc566ded43 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -63,7 +63,6 @@ public class PartialPath extends Path implements
Comparable<Path>, Cloneable {
* = "root.sg.`d.1`.`s.1`" nodes = {"root", "sg", "`d.1`", "`s.1`"}
*
* @param path a full String of a time series path
- * @throws IllegalPathException
*/
public PartialPath(String path) throws IllegalPathException {
this.nodes = PathUtils.splitPathToDetachedNodes(path);
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index ee1acf70fe..f9dd77cc37 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.commons.client.sync.SyncThriftClient;
import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
@@ -59,7 +60,13 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRoutingReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRoutingResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -1068,6 +1075,57 @@ public class ConfigNodeClient
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ @TestOnly
+ public TGetRoutingResp getRouting(TGetRoutingReq req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TGetRoutingResp resp = client.getRouting(req);
+ if (!updateConfigNodeLeader(resp.getStatus())) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
+ @Override
+ @TestOnly
+ public TGetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) throws
TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TGetTimeSlotListResp resp = client.getTimeSlotList(req);
+ if (!updateConfigNodeLeader(resp.getStatus())) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
+ @Override
+ @TestOnly
+ public TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req)
throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TGetSeriesSlotListResp resp = client.getSeriesSlotList(req);
+ if (!updateConfigNodeLeader(resp.getStatus())) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
public static class Factory extends BaseClientFactory<PartitionRegionId,
ConfigNodeClient> {
public Factory(
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
index 35e55dea8a..b1cbd047a3 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
@@ -65,12 +65,12 @@ public class StorageGroupManager {
* recover status of each virtual storage group processor, null if this
logical storage group is
* new created
*/
- private AtomicBoolean[] isDataRegionReady;
+ private final AtomicBoolean[] isDataRegionReady;
/** number of ready virtual storage group processors */
private AtomicInteger readyDataRegionNum;
- private AtomicBoolean isSettling = new AtomicBoolean();
+ private final AtomicBoolean isSettling = new AtomicBoolean();
/** value of root.stats."root.sg".TOTAL_POINTS */
private long monitorSeriesValue;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
index 84baabbf5c..5fa061145a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
@@ -333,7 +333,8 @@ public class PartitionCache {
// third try to hit storage group in fast-fail way
getStorageGroupMap(result, devicePaths, true);
if (!result.isSuccess()) {
- throw new StatementAnalyzeException("Failed to get Storage Group
Map in three try.");
+ throw new StatementAnalyzeException(
+ "Failed to get Storage Group Map in three attempts.");
}
}
} catch (TException | MetadataException | IOException e) {
@@ -358,7 +359,7 @@ public class PartitionCache {
}
/**
- * invalid storage group cache
+ * invalidate storage group cache
*
* @param storageGroupNames the storage groups that need to invalid
*/
@@ -373,7 +374,7 @@ public class PartitionCache {
}
}
- /** invalid all storage group cache */
+ /** invalidate all storage group cache */
public void removeFromStorageGroupCache() {
storageGroupCacheLock.writeLock().lock();
try {
@@ -458,7 +459,7 @@ public class PartitionCache {
}
}
- /** invalid replicaSetCache */
+ /** invalidate replicaSetCache */
public void invalidReplicaSetCache() {
try {
regionReplicaSetLock.writeLock().lock();
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index e4ffd2a981..d930bde8c8 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -177,6 +177,39 @@ struct TDataPartitionTableResp {
2: optional map<string, map<common.TSeriesPartitionSlot,
map<common.TTimePartitionSlot, list<common.TConsensusGroupId>>>>
dataPartitionTable
}
+struct TGetRoutingReq {
+ 1: required string storageGroup
+ 2: required common.TSeriesPartitionSlot seriesSlotId
+ 3: required common.TTimePartitionSlot timeSlotId
+}
+
+struct TGetRoutingResp {
+ 1: required common.TSStatus status
+ 2: optional list<common.TConsensusGroupId> dataRegionIdList
+}
+
+struct TGetTimeSlotListReq {
+ 1: required string storageGroup
+ 2: required common.TSeriesPartitionSlot seriesSlotId
+ 3: optional i64 startTime
+ 4: optional i64 endTime
+}
+
+struct TGetTimeSlotListResp {
+ 1: required common.TSStatus status
+ 2: optional list<common.TTimePartitionSlot> timeSlotList
+}
+
+struct TGetSeriesSlotListReq {
+ 1: required string storageGroup
+ 2: optional common.TConsensusGroupType type
+}
+
+struct TGetSeriesSlotListResp {
+ 1: required common.TSStatus status
+ 2: optional list<common.TSeriesPartitionSlot> seriesSlotList
+}
+
// Authorize
struct TAuthorizerReq {
1: required i32 authorType
@@ -397,6 +430,7 @@ struct TSetSchemaTemplateReq {
1: required string name
2: required string path
}
+
struct TGetPathsSetTemplatesResp {
1: required common.TSStatus status
2: optional list<string> pathList
@@ -793,5 +827,19 @@ service IConfigNodeRPCService {
/** Get PipeSink by name, if name is empty, get all PipeSink */
TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req)
+
+ // ======================================================
+ // TestTools
+ // ======================================================
+
+ /** Get a particular DataPartition's corresponding Regions */
+ TGetRoutingResp getRouting(TGetRoutingReq req)
+
+ /** Get a specific SeriesSlot's TimeSlots by start time and end time */
+ TGetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req)
+
+ /** Get the given storage group's assigned SeriesSlots */
+ TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req)
+
}