This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c96cfa7 [IOTDB-2684] refactoring confignode architecture and schema
partition assign (#5374)
c96cfa7 is described below
commit c96cfa711d089d0661cc5553976dd0f6575ddfde
Author: wangchao316 <[email protected]>
AuthorDate: Fri Apr 1 17:50:07 2022 +0800
[IOTDB-2684] refactoring confignode architecture and schema partition
assign (#5374)
---
.../iotdb/confignode/conf/ConfigNodeConfCheck.java | 28 ++-
...chemaDataSet.java => DataPartitionDataSet.java} | 18 +-
.../consensus/response/SchemaPartitionDataSet.java | 79 +++++++
.../response/StorageGroupSchemaDataSet.java | 8 +-
.../iotdb/confignode/manager/ConfigManager.java | 214 +++++++++---------
.../{ConfigManager.java => ConsensusManager.java} | 26 +--
.../iotdb/confignode/manager/DataNodeManager.java | 169 ++++++++++++++
.../apache/iotdb/confignode/manager/Manager.java | 119 ++++++++++
.../iotdb/confignode/manager/PartitionManager.java | 146 +++++++++++++
.../iotdb/confignode/manager/RegionManager.java | 152 +++++++++++++
.../iotdb/confignode/partition/DataNodeInfo.java | 8 +-
.../confignode/partition/DataPartitionInfo.java | 65 ++----
...{DataPartitionInfo.java => DataRegionInfo.java} | 46 ++--
.../iotdb/confignode/partition/PartitionTable.java | 189 ----------------
.../confignode/partition/SchemaPartitionInfo.java | 75 +++++--
...emaPartitionInfo.java => SchemaRegionInfo.java} | 41 ++--
.../partition/SchemaRegionReplicaSet.java | 82 +++++++
.../persistence/DataNodeInfoPersistence.java | 186 ++++++++++++++++
.../persistence/PartitionInfoPersistence.java | 157 +++++++++++++
.../persistence/RegionInfoPersistence.java | 189 ++++++++++++++++
.../iotdb/confignode/physical/PhysicalPlan.java | 14 ++
.../confignode/physical/PhysicalPlanType.java | 6 +-
.../confignode/physical/sys/DataPartitionPlan.java | 78 +++++++
.../physical/sys/SchemaPartitionPlan.java | 98 +++++++++
.../physical/sys/SetStorageGroupPlan.java | 30 +++
.../confignode/service/balancer/LoadBalancer.java | 12 -
.../confignode/service/executor/PlanExecutor.java | 32 ++-
.../server/ConfigNodeRPCServerProcessor.java | 61 ++++--
.../confignode/util/SerializeDeserializeUtil.java | 242 +++++++++++++++++++++
.../hash/DeviceGroupHashExecutorManualTest.java | 4 +-
.../server/ConfigNodeRPCServerProcessorTest.java | 151 +++++++++++++
.../utils/SerializeDeserializeUtilTest.java | 90 ++++++++
.../apache/iotdb/consensus/common/Endpoint.java | 24 +-
.../commons/partition/SchemaPartitionInfo.java | 17 ++
.../db/mpp/sql/analyze/IPartitionFetcher.java | 2 +-
.../src/main/thrift/confignode.thrift | 23 +-
36 files changed, 2349 insertions(+), 532 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java
index a6e0f9c..2a8c495 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java
@@ -52,16 +52,11 @@ public class ConfigNodeConfCheck {
public void checkConfig() throws ConfigurationException, IOException,
StartupException {
// If systemDir does not exist, create systemDir
File systemDir = new File(conf.getSystemDir());
- if (!systemDir.exists()) {
- if (systemDir.mkdirs()) {
- LOGGER.info("Make system dirs: {}", systemDir);
- } else {
- throw new IOException(
- String.format(
- "Start ConfigNode failed, because couldn't make system dirs:
%s.",
- systemDir.getAbsolutePath()));
- }
- }
+ createDir(systemDir);
+
+ // If consensusDir does not exist, create consensusDir
+ File consensusDir = new File(conf.getConsensusDir());
+ createDir(consensusDir);
File specialPropertiesFile =
new File(conf.getSystemDir() + File.separator +
ConfigNodeConstant.SPECIAL_CONF_NAME);
@@ -90,6 +85,19 @@ public class ConfigNodeConfCheck {
}
}
+ private void createDir(File dir) throws IOException {
+ if (!dir.exists()) {
+ if (dir.mkdirs()) {
+ LOGGER.info("Make dirs: {}", dir);
+ } else {
+ throw new IOException(
+ String.format(
+ "Start ConfigNode failed, because couldn't make system dirs:
%s.",
+ dir.getAbsolutePath()));
+ }
+ }
+ }
+
/**
* There are some special parameters that can't be changed after once we
start ConfigNode.
* Therefore, store them in iotdb-confignode-special.properties at the first
startup
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
similarity index 70%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
index a523420..8c8c811 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
@@ -16,22 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.consensus.response;
-import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.commons.partition.DataPartitionInfo;
import org.apache.iotdb.consensus.common.DataSet;
-import java.util.List;
-
-public class StorageGroupSchemaDataSet implements DataSet {
-
- private final List<StorageGroupSchema> schemaList;
+public class DataPartitionDataSet implements DataSet {
+ private DataPartitionInfo dataPartitionInfo;
- public StorageGroupSchemaDataSet(List<StorageGroupSchema> schemaList) {
- this.schemaList = schemaList;
+ public DataPartitionInfo getDataPartitionInfo() {
+ return dataPartitionInfo;
}
- public List<StorageGroupSchema> getSchemaList() {
- return schemaList;
+ public void setDataPartitionInfos(DataPartitionInfo dataPartitionInfo) {
+ this.dataPartitionInfo = dataPartitionInfo;
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
new file mode 100644
index 0000000..0a0acbe
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.response;
+
+import org.apache.iotdb.confignode.partition.SchemaPartitionInfo;
+import org.apache.iotdb.confignode.rpc.thrift.RegionReplicaSet;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SchemaPartitionDataSet implements DataSet {
+ private SchemaPartitionInfo schemaPartitionInfo;
+
+ public SchemaPartitionInfo getSchemaPartitionInfo() {
+ return schemaPartitionInfo;
+ }
+
+ public void setSchemaPartitionInfo(SchemaPartitionInfo schemaPartitionInfos)
{
+ this.schemaPartitionInfo = schemaPartitionInfos;
+ }
+
+ public static org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo
+ convertRpcSchemaPartition(SchemaPartitionInfo schemaPartitionInfo) {
+ org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo
rpcSchemaPartitionInfo =
+ new org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo();
+
+ Map<String, Map<Integer, RegionReplicaSet>> schemaRegionRelicatSets = new
HashMap<>();
+
+ schemaPartitionInfo.getSchemaPartitionInfo().entrySet().stream()
+ .forEach(
+ entity -> {
+ schemaRegionRelicatSets.putIfAbsent(entity.getKey(), new
HashMap<>());
+ entity
+ .getValue()
+ .entrySet()
+ .forEach(
+ replica -> {
+ RegionReplicaSet regionReplicaSet = new
RegionReplicaSet();
+ regionReplicaSet.setRegionId(replica.getKey());
+ List<EndPoint> endPoints = new ArrayList<>();
+ replica
+ .getValue()
+ .getEndPointList()
+ .forEach(
+ point -> {
+ EndPoint endPoint = new
EndPoint(point.getIp(), point.getPort());
+ endPoints.add(endPoint);
+ });
+ regionReplicaSet.setEndpoint(endPoints);
+ schemaRegionRelicatSets
+ .get(entity.getKey())
+ .put(replica.getKey(), regionReplicaSet);
+ });
+ });
+
rpcSchemaPartitionInfo.setSchemaRegionDataNodesMap(schemaRegionRelicatSets);
+ return rpcSchemaPartitionInfo;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
index a523420..934f050 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
@@ -25,7 +25,9 @@ import java.util.List;
public class StorageGroupSchemaDataSet implements DataSet {
- private final List<StorageGroupSchema> schemaList;
+ private List<StorageGroupSchema> schemaList;
+
+ public StorageGroupSchemaDataSet() {}
public StorageGroupSchemaDataSet(List<StorageGroupSchema> schemaList) {
this.schemaList = schemaList;
@@ -34,4 +36,8 @@ public class StorageGroupSchemaDataSet implements DataSet {
public List<StorageGroupSchema> getSchemaList() {
return schemaList;
}
+
+ public void setSchemaList(List<StorageGroupSchema> schemaList) {
+ this.schemaList = schemaList;
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 1fd5767..d533f06 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -16,154 +16,134 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.manager;
-import org.apache.iotdb.commons.hash.DeviceGroupHashExecutor;
import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import
org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
+import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.consensus.IConsensus;
-import org.apache.iotdb.consensus.common.ConsensusGroupId;
-import org.apache.iotdb.consensus.common.Endpoint;
-import org.apache.iotdb.consensus.common.GroupType;
-import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.consensus.ratis.RatisConsensus;
-import org.apache.iotdb.consensus.standalone.StandAloneConsensus;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
+import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
+import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
+import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
+import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.confignode.rpc.thrift.DeviceGroupHashInfo;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * ConfigManager maintains consistency between PartitionTables in the
ConfigNodeGroup. Expose the
- * query interface for the PartitionTable
- */
-public class ConfigManager {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigManager.class);
+/** Entry of all management, AssignPartitionManager,AssignRegionManager. */
+public class ConfigManager implements Manager {
private static final ConfigNodeConf conf =
ConfigNodeDescriptor.getInstance().getConf();
+ private static final TSStatus ERROR_TSSTATUS =
+ new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+
+ /** manage consensus, write or read consensus */
+ private final ConsensusManager consensusManager;
+
+ /** manage data node */
+ private final DataNodeManager dataNodeManager;
- private IConsensus consensusImpl;
- private ConsensusGroupId consensusGroupId;
+ /** manage assign data partition and schema partition */
+ private final PartitionManager partitionManager;
- private DeviceGroupHashExecutor hashExecutor;
+ /** manager assign schema region and data region */
+ private final RegionManager regionManager;
public ConfigManager() throws IOException {
- setHashExecutor();
- setConsensusLayer();
+ this.dataNodeManager = new DataNodeManager(this);
+ this.partitionManager = new PartitionManager(this);
+ this.regionManager = new RegionManager(this);
+ this.consensusManager = new ConsensusManager();
+ }
+
+ @Override
+ public boolean isStopped() {
+ return false;
+ }
+
+ @Override
+ public TSStatus registerDataNode(PhysicalPlan physicalPlan) {
+ if (physicalPlan instanceof RegisterDataNodePlan) {
+ return dataNodeManager.registerDataNode((RegisterDataNodePlan)
physicalPlan);
+ }
+ return ERROR_TSSTATUS;
}
- /** Build DeviceGroupHashExecutor */
- private void setHashExecutor() {
- try {
- Class<?> executor =
Class.forName(conf.getDeviceGroupHashExecutorClass());
- Constructor<?> executorConstructor = executor.getConstructor(int.class);
- hashExecutor =
- (DeviceGroupHashExecutor)
executorConstructor.newInstance(conf.getDeviceGroupCount());
- } catch (ClassNotFoundException
- | NoSuchMethodException
- | InstantiationException
- | IllegalAccessException
- | InvocationTargetException e) {
- LOGGER.error(
- "Couldn't Constructor DeviceGroupHashExecutor class: {}",
- conf.getDeviceGroupHashExecutorClass(),
- e);
- hashExecutor = null;
+ @Override
+ public DataSet getDataNodeInfo(PhysicalPlan physicalPlan) {
+ if (physicalPlan instanceof QueryDataNodeInfoPlan) {
+ return dataNodeManager.getDataNodeInfo((QueryDataNodeInfoPlan)
physicalPlan);
}
+ return new DataNodesInfoDataSet();
}
- public int getDeviceGroupID(String device) {
- return hashExecutor.getDeviceGroupID(device);
+ @Override
+ public DataSet getStorageGroupSchema() {
+ return regionManager.getStorageGroupSchema();
}
- /** Build ConfigNodeGroup ConsensusLayer */
- private void setConsensusLayer() throws IOException {
- // There is only one ConfigNodeGroup
- consensusGroupId = new ConsensusGroupId(GroupType.PartitionRegion, 0);
-
- // If consensusDir does not exist, create consensusDir
- File consensusDir = new File(conf.getConsensusDir());
- if (!consensusDir.exists()) {
- if (consensusDir.mkdirs()) {
- LOGGER.info("Make consensus dirs: {}", consensusDir);
- } else {
- throw new IOException(
- String.format(
- "Start ConfigNode failed, because couldn't make system dirs:
%s.",
- consensusDir.getAbsolutePath()));
- }
+ @Override
+ public TSStatus setStorageGroup(PhysicalPlan physicalPlan) {
+ if (physicalPlan instanceof SetStorageGroupPlan) {
+ return regionManager.setStorageGroup((SetStorageGroupPlan) physicalPlan);
}
+ return ERROR_TSSTATUS;
+ }
- // Implement specific consensus
- switch (conf.getConsensusType()) {
- case STANDALONE:
- constructStandAloneConsensus();
- break;
- case RATIS:
- constructRatisConsensus();
- break;
- default:
- throw new IllegalArgumentException(
- "Start ConfigNode failed, unrecognized ConsensusType: "
- + conf.getConsensusType().getTypeName());
+ @Override
+ public DataNodeManager getDataNodeManager() {
+ return dataNodeManager;
+ }
+
+ @Override
+ public DataSet getDataPartition(PhysicalPlan physicalPlan) {
+ if (physicalPlan instanceof DataPartitionPlan) {
+ return partitionManager.getDataPartition((DataPartitionPlan)
physicalPlan);
}
+ return new DataNodesInfoDataSet();
}
- private void constructStandAloneConsensus() throws IOException {
- // Standalone consensus
- consensusImpl = new StandAloneConsensus(id -> new
PartitionRegionStateMachine());
- consensusImpl.start();
-
- // Standalone ConsensusGroup
- consensusImpl.addConsensusGroup(
- consensusGroupId,
- Collections.singletonList(
- new Peer(
- consensusGroupId, new Endpoint(conf.getRpcAddress(),
conf.getInternalPort()))));
+ @Override
+ public DataSet getSchemaPartition(PhysicalPlan physicalPlan) {
+ if (physicalPlan instanceof SchemaPartitionPlan) {
+ return partitionManager.getSchemaPartition((SchemaPartitionPlan)
physicalPlan);
+ }
+ return new DataNodesInfoDataSet();
}
- private void constructRatisConsensus() throws IOException {
- // Ratis consensus local implement
- consensusImpl =
- RatisConsensus.newBuilder()
- .setEndpoint(new Endpoint(conf.getRpcAddress(),
conf.getInternalPort()))
- .setStateMachineRegistry(id -> new PartitionRegionStateMachine())
- .setStorageDir(new File(conf.getConsensusDir()))
- .build();
- consensusImpl.start();
-
- // Build ratis group from user properties
- LOGGER.info(
- "Set ConfigNode consensus group {}...",
- Arrays.toString(conf.getConfigNodeGroupAddressList()));
- List<Peer> peerList = new ArrayList<>();
- for (Endpoint endpoint : conf.getConfigNodeGroupAddressList()) {
- peerList.add(new Peer(consensusGroupId, endpoint));
+ @Override
+ public RegionManager getRegionManager() {
+ return regionManager;
+ }
+
+ @Override
+ public DataSet applySchemaPartition(PhysicalPlan physicalPlan) {
+ if (physicalPlan instanceof SchemaPartitionPlan) {
+ return partitionManager.applySchemaPartition((SchemaPartitionPlan)
physicalPlan);
}
- consensusImpl.addConsensusGroup(consensusGroupId, peerList);
+ return new DataNodesInfoDataSet();
}
- /** Transmit PhysicalPlan to confignode.consensus.statemachine */
- public ConsensusWriteResponse write(PhysicalPlan plan) {
- return consensusImpl.write(consensusGroupId, plan);
+ @Override
+ public DataSet applyDataPartition(PhysicalPlan physicalPlan) {
+ if (physicalPlan instanceof DataPartitionPlan) {
+ return partitionManager.applyDataPartition((DataPartitionPlan)
physicalPlan);
+ }
+ return new DataNodesInfoDataSet();
}
- /** Transmit PhysicalPlan to confignode.consensus.statemachine */
- public ConsensusReadResponse read(PhysicalPlan plan) {
- return consensusImpl.read(consensusGroupId, plan);
+ @Override
+ public DeviceGroupHashInfo getDeviceGroupHashInfo() {
+ return new DeviceGroupHashInfo(
+ conf.getDeviceGroupCount(), conf.getDeviceGroupHashExecutorClass());
}
- // TODO: Interfaces for LoadBalancer control
+ @Override
+ public ConsensusManager getConsensusManager() {
+ return consensusManager;
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
similarity index 88%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 1fd5767..0b8d4ee 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -45,21 +45,18 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-/**
- * ConfigManager maintains consistency between PartitionTables in the
ConfigNodeGroup. Expose the
- * query interface for the PartitionTable
- */
-public class ConfigManager {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigManager.class);
+/** ConsensusManager maintains consensus class, request will redirect to
consensus layer */
+public class ConsensusManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConsensusManager.class);
private static final ConfigNodeConf conf =
ConfigNodeDescriptor.getInstance().getConf();
private IConsensus consensusImpl;
+
private ConsensusGroupId consensusGroupId;
private DeviceGroupHashExecutor hashExecutor;
- public ConfigManager() throws IOException {
+ public ConsensusManager() throws IOException {
setHashExecutor();
setConsensusLayer();
}
@@ -93,19 +90,6 @@ public class ConfigManager {
// There is only one ConfigNodeGroup
consensusGroupId = new ConsensusGroupId(GroupType.PartitionRegion, 0);
- // If consensusDir does not exist, create consensusDir
- File consensusDir = new File(conf.getConsensusDir());
- if (!consensusDir.exists()) {
- if (consensusDir.mkdirs()) {
- LOGGER.info("Make consensus dirs: {}", consensusDir);
- } else {
- throw new IOException(
- String.format(
- "Start ConfigNode failed, because couldn't make system dirs:
%s.",
- consensusDir.getAbsolutePath()));
- }
- }
-
// Implement specific consensus
switch (conf.getConsensusType()) {
case STANDALONE:
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
new file mode 100644
index 0000000..83f19ef
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
+import org.apache.iotdb.confignode.partition.DataNodeInfo;
+import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
+import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
+import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** Manager server info of data node, add node or remove node */
+public class DataNodeManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeManager.class);
+
+ private DataNodeInfoPersistence dataNodeInfo =
DataNodeInfoPersistence.getInstance();
+
+ private Manager configManager;
+
+ /** TODO:do some operate after add node or remove node */
+ private List<ChangeServerListener> listeners = new CopyOnWriteArrayList<>();
+
+ private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
+
+ private int nextDataNodeId;
+
+ public DataNodeManager(Manager configManager) {
+ this.configManager = configManager;
+ this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
+ }
+
+ /**
+ * register dta node info when data node start
+ *
+ * @param plan RegisterDataNodePlan
+ * @return success if data node regist first
+ */
+ public TSStatus registerDataNode(RegisterDataNodePlan plan) {
+ TSStatus result;
+ DataNodeInfo info = plan.getInfo();
+ dataNodeInfoReadWriteLock.writeLock().lock();
+ try {
+ if (dataNodeInfo.containsValue(info)) {
+ // TODO: optimize
+ result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ result.setMessage(String.valueOf(dataNodeInfo.getDataNodeInfo(info)));
+ } else {
+ info.setDataNodeID(nextDataNodeId);
+ ConsensusWriteResponse consensusWriteResponse =
getConsensusManager().write(plan);
+ nextDataNodeId += 1;
+ return consensusWriteResponse.getStatus();
+ }
+ } finally {
+ dataNodeInfoReadWriteLock.writeLock().unlock();
+ }
+ return result;
+ }
+
+ /**
+ * get dta node info
+ *
+ * @param plan QueryDataNodeInfoPlan
+ * @return all data node info if dataNodeId of plan is -1
+ */
+ public DataNodesInfoDataSet getDataNodeInfo(QueryDataNodeInfoPlan plan) {
+ return (DataNodesInfoDataSet)
getConsensusManager().read(plan).getDataset();
+ }
+
+ public Set<Integer> getDataNodeId() {
+ return dataNodeInfo.getDataNodeIds();
+ }
+
+ private ConsensusManager getConsensusManager() {
+ return configManager.getConsensusManager();
+ }
+
+ public void registerListener(final ChangeServerListener serverListener) {
+ listeners.add(serverListener);
+ }
+
+ public boolean unregisterListener(final ChangeServerListener serverListener)
{
+ return listeners.remove(serverListener);
+ }
+
+ /** TODO: wait data node register, wait */
+ public void waitForDataNodes() {
+ listeners.stream().forEach(serverListener -> serverListener.waiting());
+ }
+
+ public Map<Integer, DataNodeInfo> getOnlineDataNodes() {
+ return dataNodeInfo.getOnlineDataNodes();
+ }
+
+ private class ServerStartListenerThread extends Thread implements
ChangeServerListener {
+ private boolean changed = false;
+
+ ServerStartListenerThread() {
+ setDaemon(true);
+ }
+
+ @Override
+ public void addDataNode(DataNodeInfo DataNodeInfo) {
+ serverChanged();
+ }
+
+ @Override
+ public void removeDataNode(DataNodeInfo dataNodeInfo) {
+ serverChanged();
+ }
+
+ private synchronized void serverChanged() {
+ changed = true;
+ this.notify();
+ }
+
+ @Override
+ public void run() {
+ while (!configManager.isStopped()) {}
+ }
+ }
+
+ /** TODO: For listener for add or remove data node */
+ public interface ChangeServerListener {
+
+ /** Started waiting on DataNode to check */
+ default void waiting() {};
+
+ /**
+ * The server has joined the cluster
+ *
+ * @param dataNodeInfo datanode info
+ */
+ void addDataNode(final DataNodeInfo dataNodeInfo);
+
+ /**
+ * remove data node
+ *
+ * @param dataNodeInfo data node info
+ */
+ void removeDataNode(final DataNodeInfo dataNodeInfo);
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
new file mode 100644
index 0000000..a0228f1
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.rpc.thrift.DeviceGroupHashInfo;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+/**
+ * a subset of services provided by {@ConfigManager}. For use internally only,
pased to Managers,
+ * services.
+ */
+public interface Manager {
+
+ /**
+ * if a service stop
+ *
+ * @return true if service stopped
+ */
+ public boolean isStopped();
+
+ /**
+ * register data node
+ *
+ * @param physicalPlan physical plan
+ * @return status
+ */
+ public TSStatus registerDataNode(PhysicalPlan physicalPlan);
+
+ /**
+ * get data node info
+ *
+ * @param physicalPlan physical plan
+ * @return data set
+ */
+ DataSet getDataNodeInfo(PhysicalPlan physicalPlan);
+
+ /**
+ * get storage group schema
+ *
+ * @return data set
+ */
+ DataSet getStorageGroupSchema();
+
+ /**
+ * set storage group
+ *
+ * @param physicalPlan physical plan
+ * @return status
+ */
+ TSStatus setStorageGroup(PhysicalPlan physicalPlan);
+
+ /**
+ * get data node info manager
+ *
+ * @return DataNodeInfoManager instance
+ */
+ DataNodeManager getDataNodeManager();
+
+ /**
+ * get data partition
+ *
+ * @param physicalPlan physical plan
+ * @return data set
+ */
+ DataSet getDataPartition(PhysicalPlan physicalPlan);
+
+ /**
+ * get schema partition
+ *
+ * @param physicalPlan physical plan
+ * @return data set
+ */
+ DataSet getSchemaPartition(PhysicalPlan physicalPlan);
+
+ /**
+ * get assign region manager
+ *
+ * @return AssignRegionManager instance
+ */
+ RegionManager getRegionManager();
+
+ /**
+ * apply schema partition
+ *
+ * @param physicalPlan physical plan
+ * @return data set
+ */
+ DataSet applySchemaPartition(PhysicalPlan physicalPlan);
+
+ /**
+ * apply data partition
+ *
+ * @param physicalPlan physical plan
+ * @return data set
+ */
+ DataSet applyDataPartition(PhysicalPlan physicalPlan);
+
+ DeviceGroupHashInfo getDeviceGroupHashInfo();
+
+ ConsensusManager getConsensusManager();
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
new file mode 100644
index 0000000..70dc6a9
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
+import org.apache.iotdb.confignode.partition.DataPartitionInfo;
+import org.apache.iotdb.confignode.partition.SchemaRegionReplicaSet;
+import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
+import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
+import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
+import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** manage data partition and schema partition */
+public class PartitionManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionManager.class);
+
+ /** schema partition read write lock */
+ private final ReentrantReadWriteLock schemaPartitionReadWriteLock;
+
+ /** data partition read write lock */
+ private final ReentrantReadWriteLock dataPartitionReadWriteLock;
+
+ // TODO: Serialize and Deserialize
+ private final DataPartitionInfo dataPartition;
+
+ private final Manager configNodeManager;
+
+ public PartitionManager(Manager configNodeManager) {
+ this.schemaPartitionReadWriteLock = new ReentrantReadWriteLock();
+ this.dataPartitionReadWriteLock = new ReentrantReadWriteLock();
+ this.configNodeManager = configNodeManager;
+ this.dataPartition = new DataPartitionInfo();
+ }
+
+ /**
+ * Get schema partition
+ *
+ * @param physicalPlan storageGroup and deviceGroupIDs
+ * @return Empty Data Set if does not exist
+ */
+ public DataSet getSchemaPartition(SchemaPartitionPlan physicalPlan) {
+ SchemaPartitionDataSet schemaPartitionDataSet;
+ schemaPartitionReadWriteLock.readLock().lock();
+ try {
+ ConsensusReadResponse consensusReadResponse =
getConsensusManager().read(physicalPlan);
+ schemaPartitionDataSet = (SchemaPartitionDataSet)
consensusReadResponse.getDataset();
+ } finally {
+ schemaPartitionReadWriteLock.readLock().unlock();
+ }
+ return schemaPartitionDataSet;
+ }
+
+ /**
+ * If does not exist, apply a new schema partition
+ *
+ * @param physicalPlan storage group and device group id
+ * @return Schema Partition data set
+ */
+ public DataSet applySchemaPartition(SchemaPartitionPlan physicalPlan) {
+ String storageGroup = physicalPlan.getStorageGroup();
+ List<Integer> deviceGroupIDs = physicalPlan.getDeviceGroupIDs();
+ List<Integer> noAssignDeviceGroupId =
+ PartitionInfoPersistence.getInstance()
+ .filterSchemaRegionNoAssignDeviceGroupId(storageGroup,
deviceGroupIDs);
+
+ // allocate partition by storage group and device group id
+ schemaPartitionReadWriteLock.writeLock().lock();
+ try {
+ Map<Integer, SchemaRegionReplicaSet> deviceGroupIdReplicaSets =
+ allocateSchemaPartition(storageGroup, noAssignDeviceGroupId);
+ physicalPlan.setDeviceGroupIdReplicaSet(deviceGroupIdReplicaSets);
+ getConsensusManager().write(physicalPlan);
+ LOGGER.info("Allocate schema partition to {}.",
deviceGroupIdReplicaSets);
+ } finally {
+ schemaPartitionReadWriteLock.writeLock().unlock();
+ }
+
+ return getSchemaPartition(physicalPlan);
+ }
+
+ /**
+ * TODO: allocate schema partition by balancer
+ *
+ * @param storageGroup storage group
+ * @param deviceGroupIDs device group id list
+ */
+ private Map<Integer, SchemaRegionReplicaSet> allocateSchemaPartition(
+ String storageGroup, List<Integer> deviceGroupIDs) {
+ List<SchemaRegionReplicaSet> schemaRegionEndPoints =
+ RegionInfoPersistence.getInstance().getSchemaRegionEndPoint();
+ Random random = new Random();
+ Map<Integer, SchemaRegionReplicaSet> deviceGroupIdReplicaSets = new
HashMap<>();
+ for (int i = 0; i < deviceGroupIDs.size(); i++) {
+ SchemaRegionReplicaSet schemaRegionReplicaSet =
+
schemaRegionEndPoints.get(random.nextInt(schemaRegionEndPoints.size()));
+ deviceGroupIdReplicaSets.put(deviceGroupIDs.get(i),
schemaRegionReplicaSet);
+ }
+ return deviceGroupIdReplicaSets;
+ }
+
+ private ConsensusManager getConsensusManager() {
+ return configNodeManager.getConsensusManager();
+ }
+
+ /**
+ * TODO:allocate schema partition by balancer
+ *
+ * @param physicalPlan physical plan
+ * @return data set
+ */
+ public DataSet applyDataPartition(DataPartitionPlan physicalPlan) {
+ return null;
+ }
+
+ public DataSet getDataPartition(DataPartitionPlan physicalPlan) {
+ return null;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
new file mode 100644
index 0000000..5e11594
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.confignode.conf.ConfigNodeConf;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import
org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
+import org.apache.iotdb.confignode.partition.DataRegionInfo;
+import org.apache.iotdb.confignode.partition.SchemaRegionInfo;
+import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
+import org.apache.iotdb.confignode.physical.sys.QueryStorageGroupSchemaPlan;
+import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** manage data partition and schema partition */
+public class RegionManager {
+ private static final ConfigNodeConf conf =
ConfigNodeDescriptor.getInstance().getConf();
+ private static final int regionReplicaCount = conf.getRegionReplicaCount();
+ private static final int schemaRegionCount = conf.getSchemaRegionCount();
+ private static final int dataRegionCount = conf.getDataRegionCount();
+
+ /** partition read write lock */
+ private final ReentrantReadWriteLock partitionReadWriteLock;
+
+ // TODO: Serialize and Deserialize
+ private int nextSchemaRegionGroup = 0;
+ // TODO: Serialize and Deserialize
+ private int nextDataRegionGroup = 0;
+
+ private RegionInfoPersistence regionInfoPersistence =
RegionInfoPersistence.getInstance();
+
+ private final Manager configNodeManager;
+
+ public RegionManager(Manager configNodeManager) {
+ this.partitionReadWriteLock = new ReentrantReadWriteLock();
+ this.configNodeManager = configNodeManager;
+ }
+
+ private ConsensusManager getConsensusManager() {
+ return configNodeManager.getConsensusManager();
+ }
+
+ /**
+ * 1. region allocation 2. add to storage group map
+ *
+ * @param plan SetStorageGroupPlan
+ * @return TSStatusCode.SUCCESS_STATUS if region allocate
+ */
+ public TSStatus setStorageGroup(SetStorageGroupPlan plan) {
+ TSStatus result;
+ partitionReadWriteLock.writeLock().lock();
+ try {
+ if (configNodeManager.getDataNodeManager().getDataNodeId().size() <
regionReplicaCount) {
+ result = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ result.setMessage("DataNode is not enough, please register more.");
+ } else {
+ if
(regionInfoPersistence.containsStorageGroup(plan.getSchema().getName())) {
+ result = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ result.setMessage(
+ String.format("StorageGroup %s is already set.",
plan.getSchema().getName()));
+ } else {
+ String storageGroupName = plan.getSchema().getName();
+ StorageGroupSchema storageGroupSchema = new
StorageGroupSchema(storageGroupName);
+
+ // allocate schema region
+ SchemaRegionInfo schemaRegionInfo =
schemaRegionAllocation(storageGroupSchema);
+ plan.setSchemaRegionInfo(schemaRegionInfo);
+
+ // allocate data region
+ DataRegionInfo dataRegionInfo =
dataRegionAllocation(storageGroupSchema);
+ plan.setDataRegionInfo(dataRegionInfo);
+
+ // write consensus
+ result = getConsensusManager().write(plan).getStatus();
+ }
+ }
+ } finally {
+ partitionReadWriteLock.writeLock().unlock();
+ }
+ return result;
+ }
+
+ private DataNodeManager getDataNodeInfoManager() {
+ return configNodeManager.getDataNodeManager();
+ }
+
+ private SchemaRegionInfo schemaRegionAllocation(StorageGroupSchema
storageGroupSchema) {
+
+ SchemaRegionInfo schemaRegionInfo = new SchemaRegionInfo();
+ // TODO: Use CopySet algorithm to optimize region allocation policy
+ for (int i = 0; i < schemaRegionCount; i++) {
+ List<Integer> dataNodeList = new
ArrayList<>(getDataNodeInfoManager().getDataNodeId());
+ Collections.shuffle(dataNodeList);
+ schemaRegionInfo.addSchemaRegion(
+ nextSchemaRegionGroup, dataNodeList.subList(0, regionReplicaCount));
+ storageGroupSchema.addSchemaRegionGroup(nextSchemaRegionGroup);
+ nextSchemaRegionGroup += 1;
+ }
+ return schemaRegionInfo;
+ }
+
+ /**
+ * TODO: Only perform in leader node, @rongzhao
+ *
+ * @param storageGroupSchema
+ */
+ private DataRegionInfo dataRegionAllocation(StorageGroupSchema
storageGroupSchema) {
+ // TODO: Use CopySet algorithm to optimize region allocation policy
+ DataRegionInfo dataRegionInfo = new DataRegionInfo();
+ for (int i = 0; i < dataRegionCount; i++) {
+ List<Integer> dataNodeList = new
ArrayList<>(getDataNodeInfoManager().getDataNodeId());
+ Collections.shuffle(dataNodeList);
+ dataRegionInfo.createDataRegion(
+ nextDataRegionGroup, dataNodeList.subList(0, regionReplicaCount));
+ storageGroupSchema.addDataRegionGroup(nextDataRegionGroup);
+ nextDataRegionGroup += 1;
+ }
+ return dataRegionInfo;
+ }
+
+ public StorageGroupSchemaDataSet getStorageGroupSchema() {
+
+ ConsensusReadResponse readResponse =
+ getConsensusManager().read(new QueryStorageGroupSchemaPlan());
+ return (StorageGroupSchemaDataSet) readResponse.getDataset();
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataNodeInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataNodeInfo.java
index 52134c6..9e78f16 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataNodeInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataNodeInfo.java
@@ -34,6 +34,8 @@ public class DataNodeInfo {
public DataNodeInfo(int dataNodeID, Endpoint endPoint) {
this.dataNodeID = dataNodeID;
this.endPoint = endPoint;
+ dataRegionGroupIDs = new ArrayList<>();
+ schemaRegionGroupIDs = new ArrayList<>();
}
public int getDataNodeID() {
@@ -49,9 +51,6 @@ public class DataNodeInfo {
}
public void addSchemaRegionGroup(int id) {
- if (schemaRegionGroupIDs == null) {
- schemaRegionGroupIDs = new ArrayList<>();
- }
schemaRegionGroupIDs.add(id);
}
@@ -60,9 +59,6 @@ public class DataNodeInfo {
}
public void addDataRegionGroup(int id) {
- if (dataRegionGroupIDs == null) {
- dataRegionGroupIDs = new ArrayList<>();
- }
dataRegionGroupIDs.add(id);
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataPartitionInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataPartitionInfo.java
index 1ccbfa3..69aac80 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataPartitionInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataPartitionInfo.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -18,63 +18,26 @@
*/
package org.apache.iotdb.confignode.partition;
-import java.util.ArrayList;
-import java.util.HashMap;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.commons.partition.TimePartitionId;
+
import java.util.List;
import java.util.Map;
public class DataPartitionInfo {
- // TODO: Serialize and Deserialize
- // Map<StorageGroup, Map<DeviceGroupID, Map<TimeInterval,
List<DataRegionID>>>>
- private final Map<String, Map<Integer, Map<Long, List<Integer>>>>
dataPartitionTable;
- // TODO: Serialize and Deserialize
- // Map<DataRegionID, List<DataNodeID>>
- private final Map<Integer, List<Integer>> dataRegionDataNodesMap;
-
- // Map<StorageGroup, Map<DeviceGroupID, DataPartitionRule>>
- private final Map<String, Map<Integer, DataPartitionRule>>
dataPartitionRuleTable;
-
- public DataPartitionInfo() {
- this.dataPartitionTable = new HashMap<>();
- this.dataRegionDataNodesMap = new HashMap<>();
-
- this.dataPartitionRuleTable = new HashMap<>();
- }
-
- public void createDataPartition(
- String storageGroup, int deviceGroup, long timeInterval, int
dataRegionGroup) {
- if (!dataPartitionTable.containsKey(storageGroup)) {
- dataPartitionTable.put(storageGroup, new HashMap<>());
- }
- if (!dataPartitionTable.get(storageGroup).containsKey(deviceGroup)) {
- dataPartitionTable.get(storageGroup).put(deviceGroup, new HashMap<>());
- }
- if
(!dataPartitionTable.get(storageGroup).get(deviceGroup).containsKey(timeInterval))
{
- dataPartitionTable.get(storageGroup).get(deviceGroup).put(timeInterval,
new ArrayList<>());
- }
-
dataPartitionTable.get(storageGroup).get(deviceGroup).get(timeInterval).add(dataRegionGroup);
- }
-
- public List<Integer> getDataPartition(String storageGroup, int deviceGroup,
long timeInterval) {
- if (dataPartitionTable.containsKey(storageGroup)) {
- if (dataPartitionTable.get(storageGroup).containsKey(deviceGroup)) {
- return
dataPartitionTable.get(storageGroup).get(deviceGroup).get(timeInterval);
- }
- }
- return null;
- }
-
- public void createDataRegion(int dataRegionGroup, List<Integer>
dataNodeList) {
- dataRegionDataNodesMap.put(dataRegionGroup, dataNodeList);
- }
+ // Map<StorageGroup, Map<DeviceGroupID, Map<TimePartitionId,
List<DataRegionPlaceInfo>>>>
+ private Map<String, Map<Integer, Map<TimePartitionId,
List<DataRegionReplicaSet>>>>
+ dataPartitionMap;
- public List<Integer> getDataRegionLocation(int dataRegionGroup) {
- return dataRegionDataNodesMap.get(dataRegionGroup);
+ public Map<String, Map<Integer, Map<TimePartitionId,
List<DataRegionReplicaSet>>>>
+ getDataPartitionMap() {
+ return dataPartitionMap;
}
- public void updateDataPartitionRule(
- String StorageGroup, int deviceGroup, DataPartitionRule rule) {
- // TODO: Data partition policy by @YongzaoDan
+ public void setDataPartitionMap(
+ Map<String, Map<Integer, Map<TimePartitionId,
List<DataRegionReplicaSet>>>>
+ dataPartitionMap) {
+ this.dataPartitionMap = dataPartitionMap;
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataPartitionInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataRegionInfo.java
similarity index 54%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/partition/DataPartitionInfo.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/partition/DataRegionInfo.java
index 1ccbfa3..9441d14 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataPartitionInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataRegionInfo.java
@@ -18,51 +18,29 @@
*/
package org.apache.iotdb.confignode.partition;
-import java.util.ArrayList;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class DataPartitionInfo {
+public class DataRegionInfo {
// TODO: Serialize and Deserialize
- // Map<StorageGroup, Map<DeviceGroupID, Map<TimeInterval,
List<DataRegionID>>>>
- private final Map<String, Map<Integer, Map<Long, List<Integer>>>>
dataPartitionTable;
- // TODO: Serialize and Deserialize
// Map<DataRegionID, List<DataNodeID>>
- private final Map<Integer, List<Integer>> dataRegionDataNodesMap;
+ private Map<Integer, List<Integer>> dataRegionDataNodesMap;
// Map<StorageGroup, Map<DeviceGroupID, DataPartitionRule>>
private final Map<String, Map<Integer, DataPartitionRule>>
dataPartitionRuleTable;
- public DataPartitionInfo() {
- this.dataPartitionTable = new HashMap<>();
+ public DataRegionInfo() {
this.dataRegionDataNodesMap = new HashMap<>();
-
this.dataPartitionRuleTable = new HashMap<>();
}
- public void createDataPartition(
- String storageGroup, int deviceGroup, long timeInterval, int
dataRegionGroup) {
- if (!dataPartitionTable.containsKey(storageGroup)) {
- dataPartitionTable.put(storageGroup, new HashMap<>());
- }
- if (!dataPartitionTable.get(storageGroup).containsKey(deviceGroup)) {
- dataPartitionTable.get(storageGroup).put(deviceGroup, new HashMap<>());
- }
- if
(!dataPartitionTable.get(storageGroup).get(deviceGroup).containsKey(timeInterval))
{
- dataPartitionTable.get(storageGroup).get(deviceGroup).put(timeInterval,
new ArrayList<>());
- }
-
dataPartitionTable.get(storageGroup).get(deviceGroup).get(timeInterval).add(dataRegionGroup);
- }
-
- public List<Integer> getDataPartition(String storageGroup, int deviceGroup,
long timeInterval) {
- if (dataPartitionTable.containsKey(storageGroup)) {
- if (dataPartitionTable.get(storageGroup).containsKey(deviceGroup)) {
- return
dataPartitionTable.get(storageGroup).get(deviceGroup).get(timeInterval);
- }
- }
- return null;
+ public Map<Integer, List<Integer>> getDataRegionDataNodesMap() {
+ return dataRegionDataNodesMap;
}
public void createDataRegion(int dataRegionGroup, List<Integer>
dataNodeList) {
@@ -77,4 +55,12 @@ public class DataPartitionInfo {
String StorageGroup, int deviceGroup, DataPartitionRule rule) {
// TODO: Data partition policy by @YongzaoDan
}
+
+ public void serializeImpl(ByteBuffer buffer) {
+ SerializeDeserializeUtil.writeIntMapLists(dataRegionDataNodesMap, buffer);
+ }
+
+ public void deserializeImpl(ByteBuffer buffer) {
+ dataRegionDataNodesMap = SerializeDeserializeUtil.readIntMapLists(buffer);
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/PartitionTable.java
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/PartitionTable.java
deleted file mode 100644
index bc76b9f..0000000
---
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/PartitionTable.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.partition;
-
-import org.apache.iotdb.confignode.conf.ConfigNodeConf;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
-import
org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
-import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
-import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * PartitionTable stores schema partition table, data partition table,
DataNode information,
- * StorageGroup schema and real-time write load allocation rules. The
PartitionTable is thread-safe.
- */
-public class PartitionTable {
-
- private static final ConfigNodeConf conf =
ConfigNodeDescriptor.getInstance().getConf();
- private static final int regionReplicaCount = conf.getRegionReplicaCount();
- private static final int schemaRegionCount = conf.getSchemaRegionCount();
- private static final int dataRegionCount = conf.getDataRegionCount();
-
- private final ReentrantReadWriteLock lock;
- // TODO: Serialize and Deserialize
- private final Map<String, StorageGroupSchema> storageGroupsMap;
-
- // TODO: Serialize and Deserialize
- private int nextDataNode = 0;
- // TODO: Serialize and Deserialize
- private int nextSchemaRegionGroup = 0;
- // TODO: Serialize and Deserialize
- private int nextDataRegionGroup = 0;
- // TODO: Serialize and Deserialize
- private final Map<Integer, DataNodeInfo> dataNodesMap; // Map<DataNodeID,
DataNodeInfo>
-
- // TODO: Serialize and Deserialize
- private final SchemaPartitionInfo schemaPartition;
-
- // TODO: Serialize and Deserialize
- private final DataPartitionInfo dataPartition;
-
- public PartitionTable() {
- this.lock = new ReentrantReadWriteLock();
- this.storageGroupsMap = new HashMap<>();
- this.dataNodesMap = new HashMap<>();
- this.schemaPartition = new SchemaPartitionInfo();
- this.dataPartition = new DataPartitionInfo();
- }
-
- public TSStatus registerDataNode(RegisterDataNodePlan plan) {
- TSStatus result;
- DataNodeInfo info = plan.getInfo();
- lock.writeLock().lock();
-
- if (dataNodesMap.containsValue(info)) {
- // TODO: optimize
- result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- for (Map.Entry<Integer, DataNodeInfo> entry : dataNodesMap.entrySet()) {
- if (entry.getValue().equals(info)) {
- result.setMessage(String.valueOf(entry.getKey()));
- break;
- }
- }
- } else {
- info.setDataNodeID(nextDataNode);
- dataNodesMap.put(info.getDataNodeID(), info);
- result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- result.setMessage(String.valueOf(nextDataNode));
- nextDataNode += 1;
- }
-
- lock.writeLock().unlock();
- return result;
- }
-
- public DataNodesInfoDataSet getDataNodeInfo(QueryDataNodeInfoPlan plan) {
- DataNodesInfoDataSet result;
- lock.readLock().lock();
-
- if (dataNodesMap.size() == 0) {
- result = null;
- } else {
- result = new DataNodesInfoDataSet();
-
- if (plan.getDataNodeID() == -1) {
- result.setInfoList(new ArrayList<>(dataNodesMap.values()));
- } else {
- if (dataNodesMap.containsKey(plan.getDataNodeID())) {
-
result.setInfoList(Collections.singletonList(dataNodesMap.get(plan.getDataNodeID())));
- } else {
- result = null;
- }
- }
- }
-
- lock.readLock().unlock();
- return result;
- }
-
- public TSStatus setStorageGroup(SetStorageGroupPlan plan) {
- TSStatus result;
- lock.writeLock().lock();
-
- if (dataNodesMap.size() < regionReplicaCount) {
- result = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- result.setMessage("DataNode is not enough, please register more.");
- } else {
- if (storageGroupsMap.containsKey(plan.getSchema().getName())) {
- result = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- result.setMessage(
- String.format("StorageGroup %s is already set.",
plan.getSchema().getName()));
- } else {
- StorageGroupSchema schema = new
StorageGroupSchema(plan.getSchema().getName());
- regionAllocation(schema);
- storageGroupsMap.put(schema.getName(), schema);
- result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- }
- }
-
- lock.writeLock().unlock();
- return result;
- }
-
- private void regionAllocation(StorageGroupSchema schema) {
- // TODO: Use CopySet algorithm to optimize region allocation policy
- for (int i = 0; i < schemaRegionCount; i++) {
- List<Integer> dataNodeList = new ArrayList<>(dataNodesMap.keySet());
- Collections.shuffle(dataNodeList);
- for (int j = 0; j < regionReplicaCount; j++) {
-
dataNodesMap.get(dataNodeList.get(j)).addSchemaRegionGroup(nextSchemaRegionGroup);
- }
- schemaPartition.createSchemaRegion(
- nextSchemaRegionGroup, dataNodeList.subList(0, regionReplicaCount));
- schema.addSchemaRegionGroup(nextSchemaRegionGroup);
- nextSchemaRegionGroup += 1;
- }
- for (int i = 0; i < dataRegionCount; i++) {
- List<Integer> dataNodeList = new ArrayList<>(dataNodesMap.keySet());
- Collections.shuffle(dataNodeList);
- for (int j = 0; j < regionReplicaCount; j++) {
-
dataNodesMap.get(dataNodeList.get(j)).addDataRegionGroup(nextDataRegionGroup);
- }
- dataPartition.createDataRegion(
- nextDataRegionGroup, dataNodeList.subList(0, regionReplicaCount));
- schema.addDataRegionGroup(nextDataRegionGroup);
- nextDataRegionGroup += 1;
- }
- }
-
- public StorageGroupSchemaDataSet getStorageGroupSchema() {
- StorageGroupSchemaDataSet result;
- lock.readLock().lock();
-
- if (storageGroupsMap.size() == 0) {
- result = null;
- } else {
- result = new StorageGroupSchemaDataSet(new
ArrayList<>(storageGroupsMap.values()));
- }
-
- lock.readLock().unlock();
- return result;
- }
-}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaPartitionInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaPartitionInfo.java
index 3595598..d4ae3c6 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaPartitionInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaPartitionInfo.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -21,42 +21,69 @@ package org.apache.iotdb.confignode.partition;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class SchemaPartitionInfo {
- // TODO: Serialize and Deserialize
- // Map<StorageGroup, Map<DeviceGroupID, SchemaRegionID>>
- private final Map<String, Map<Integer, Integer>> schemaPartitionTable;
- // TODO: Serialize and Deserialize
- // Map<SchemaRegionID, List<DataNodeID>>
- private final Map<Integer, List<Integer>> schemaRegionDataNodesMap;
+ // Map<StorageGroup, Map<DeviceGroupID, SchemaRegionPlaceInfo>>
+ private Map<String, Map<Integer, SchemaRegionReplicaSet>>
schemaPartitionInfo;
public SchemaPartitionInfo() {
- this.schemaPartitionTable = new HashMap<>();
- this.schemaRegionDataNodesMap = new HashMap<>();
+ schemaPartitionInfo = new HashMap<>();
}
- public void createSchemaPartition(String storageGroup, int deviceGroup, int
schemaRegion) {
- if (!schemaPartitionTable.containsKey(storageGroup)) {
- schemaPartitionTable.put(storageGroup, new HashMap<>());
- }
- schemaPartitionTable.get(storageGroup).put(deviceGroup, schemaRegion);
+ public Map<String, Map<Integer, SchemaRegionReplicaSet>>
getSchemaPartitionInfo() {
+ return schemaPartitionInfo;
}
- public Integer getSchemaPartition(String storageGroup, int deviceGroup) {
- if (schemaPartitionTable.containsKey(storageGroup)) {
- return schemaPartitionTable.get(storageGroup).get(deviceGroup);
- }
- return null;
+ public void setSchemaPartitionInfo(
+ Map<String, Map<Integer, SchemaRegionReplicaSet>> schemaPartitionInfo) {
+ this.schemaPartitionInfo = schemaPartitionInfo;
+ }
+
+ public Map<String, Map<Integer, SchemaRegionReplicaSet>> getSchemaPartition(
+ String storageGroup, List<Integer> deviceGroupIDs) {
+ Map<String, Map<Integer, SchemaRegionReplicaSet>> storageGroupMap = new
HashMap<>();
+ Map<Integer, SchemaRegionReplicaSet> deviceGroupMap = new HashMap<>();
+ deviceGroupIDs.forEach(
+ deviceGroupID -> {
+ if (schemaPartitionInfo.get(storageGroup) != null
+ &&
schemaPartitionInfo.get(storageGroup).containsKey(deviceGroupID)) {
+ deviceGroupMap.put(
+ deviceGroupID,
schemaPartitionInfo.get(storageGroup).get(deviceGroupID));
+ }
+ });
+ storageGroupMap.put(storageGroup, deviceGroupMap);
+ return storageGroupMap;
}
- public void createSchemaRegion(int schemaRegion, List<Integer> dataNode) {
- if (!schemaRegionDataNodesMap.containsKey(schemaRegion)) {
- schemaRegionDataNodesMap.put(schemaRegion, dataNode);
+ /**
+ * Filter out unassigned device groups
+ *
+ * @param storageGroup storage group name
+ * @param deviceGroupIDs device group id list
+ * @return deviceGroupIDs does not assigned
+ */
+ public List<Integer> filterNoAssignDeviceGroupId(
+ String storageGroup, List<Integer> deviceGroupIDs) {
+ if (!schemaPartitionInfo.containsKey(storageGroup)) {
+ return deviceGroupIDs;
}
+ return deviceGroupIDs.stream()
+ .filter(
+ id -> {
+ if
(schemaPartitionInfo.get(storageGroup).containsKey(deviceGroupIDs)) {
+ return false;
+ }
+ return true;
+ })
+ .collect(Collectors.toList());
}
- public List<Integer> getSchemaRegionLocation(int schemaRegionGroup) {
- return schemaRegionDataNodesMap.get(schemaRegionGroup);
+ public void setSchemaRegionReplicaSet(
+ String storageGroup, int deviceGroupId, SchemaRegionReplicaSet
schemaRegionReplicaSet) {
+ schemaPartitionInfo
+ .computeIfAbsent(storageGroup, value -> new HashMap<>())
+ .put(deviceGroupId, schemaRegionReplicaSet);
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaPartitionInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionInfo.java
similarity index 58%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaPartitionInfo.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionInfo.java
index 3595598..9bde9be 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaPartitionInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionInfo.java
@@ -18,39 +18,24 @@
*/
package org.apache.iotdb.confignode.partition;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class SchemaPartitionInfo {
+public class SchemaRegionInfo {
// TODO: Serialize and Deserialize
- // Map<StorageGroup, Map<DeviceGroupID, SchemaRegionID>>
- private final Map<String, Map<Integer, Integer>> schemaPartitionTable;
- // TODO: Serialize and Deserialize
// Map<SchemaRegionID, List<DataNodeID>>
- private final Map<Integer, List<Integer>> schemaRegionDataNodesMap;
+ private Map<Integer, List<Integer>> schemaRegionDataNodesMap;
- public SchemaPartitionInfo() {
- this.schemaPartitionTable = new HashMap<>();
+ public SchemaRegionInfo() {
this.schemaRegionDataNodesMap = new HashMap<>();
}
- public void createSchemaPartition(String storageGroup, int deviceGroup, int
schemaRegion) {
- if (!schemaPartitionTable.containsKey(storageGroup)) {
- schemaPartitionTable.put(storageGroup, new HashMap<>());
- }
- schemaPartitionTable.get(storageGroup).put(deviceGroup, schemaRegion);
- }
-
- public Integer getSchemaPartition(String storageGroup, int deviceGroup) {
- if (schemaPartitionTable.containsKey(storageGroup)) {
- return schemaPartitionTable.get(storageGroup).get(deviceGroup);
- }
- return null;
- }
-
- public void createSchemaRegion(int schemaRegion, List<Integer> dataNode) {
+ public void addSchemaRegion(int schemaRegion, List<Integer> dataNode) {
if (!schemaRegionDataNodesMap.containsKey(schemaRegion)) {
schemaRegionDataNodesMap.put(schemaRegion, dataNode);
}
@@ -59,4 +44,16 @@ public class SchemaPartitionInfo {
public List<Integer> getSchemaRegionLocation(int schemaRegionGroup) {
return schemaRegionDataNodesMap.get(schemaRegionGroup);
}
+
+ public Map<Integer, List<Integer>> getSchemaRegionDataNodesMap() {
+ return schemaRegionDataNodesMap;
+ }
+
+ public void serializeImpl(ByteBuffer buffer) {
+ SerializeDeserializeUtil.writeIntMapLists(schemaRegionDataNodesMap,
buffer);
+ }
+
+ public void deserializeImpl(ByteBuffer buffer) {
+ schemaRegionDataNodesMap =
SerializeDeserializeUtil.readIntMapLists(buffer);
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionReplicaSet.java
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionReplicaSet.java
new file mode 100644
index 0000000..754af29
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionReplicaSet.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.partition;
+
+import org.apache.iotdb.consensus.common.Endpoint;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SchemaRegionReplicaSet {
+ private int schemaRegionId;
+ private List<Endpoint> endPointList;
+
+ public int getSchemaRegionId() {
+ return schemaRegionId;
+ }
+
+ public void setSchemaRegionId(int schemaRegionId) {
+ this.schemaRegionId = schemaRegionId;
+ }
+
+ public List<Endpoint> getEndPointList() {
+ return endPointList;
+ }
+
+ public void setEndPointList(List<Endpoint> endPointList) {
+ this.endPointList = endPointList;
+ }
+
+ public void serializeImpl(ByteBuffer buffer) {
+ buffer.putInt(schemaRegionId);
+
+ buffer.putInt(endPointList.size());
+ endPointList.forEach(
+ endpoint -> {
+ endpoint.serializeImpl(buffer);
+ });
+ }
+
+ public void deserializeImpl(ByteBuffer buffer) {
+ schemaRegionId = buffer.getInt();
+ int size = buffer.getInt();
+
+ if (endPointList == null) {
+ endPointList = new ArrayList<>();
+ }
+ for (int i = 0; i < size; i++) {
+ Endpoint endpoint = new Endpoint();
+ endpoint.deserializeImpl(buffer);
+ endPointList.add(endpoint);
+ }
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("SchemaRegionReplicaSet {");
+ sb.append("schemaRegionId = ").append(schemaRegionId);
+ endPointList.forEach(
+ endpoint -> {
+ sb.append(", EndPoint = ").append(endpoint);
+ });
+ sb.append("}");
+ return sb.toString();
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java
new file mode 100644
index 0000000..0da1ee9
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
+import org.apache.iotdb.confignode.partition.DataNodeInfo;
+import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
+import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class DataNodeInfoPersistence {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeInfoPersistence.class);
+
+ /** online data nodes */
+ private final ConcurrentNavigableMap<Integer, DataNodeInfo> onlineDataNodes =
+ new ConcurrentSkipListMap();
+
+ /** For remove node or draining node */
+ private Set<DataNodeInfo> drainingDataNodes = new HashSet<>();
+
+ private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
+
+ private DataNodeInfoPersistence() {
+ this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
+ }
+
+ public ConcurrentNavigableMap<Integer, DataNodeInfo> getOnlineDataNodes() {
+ return onlineDataNodes;
+ }
+
+ public boolean containsValue(DataNodeInfo info) {
+ return onlineDataNodes.containsValue(info);
+ }
+
+ public void put(int dataNodeID, DataNodeInfo info) {
+ onlineDataNodes.put(dataNodeID, info);
+ }
+
+ public int getDataNodeInfo(DataNodeInfo info) {
+ // TODO: optimize
+ for (Map.Entry<Integer, DataNodeInfo> entry : onlineDataNodes.entrySet()) {
+ if (entry.getValue().equals(info)) {
+ return info.getDataNodeID();
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * register dta node info when data node start
+ *
+ * @param plan RegisterDataNodePlan
+ * @return success if data node regist first
+ */
+ public TSStatus registerDataNode(RegisterDataNodePlan plan) {
+ TSStatus result;
+ DataNodeInfo info = plan.getInfo();
+ dataNodeInfoReadWriteLock.writeLock().lock();
+ try {
+ if (onlineDataNodes.containsValue(info)) {
+ result = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ result.setMessage(
+ String.format(
+ "DataNode %s is already registered.",
plan.getInfo().getEndPoint().toString()));
+ } else {
+ onlineDataNodes.put(info.getDataNodeID(), info);
+ result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ result.setMessage(String.valueOf(info.getDataNodeID()));
+ LOGGER.info("Register data node success, data node is {}", plan);
+ }
+ } finally {
+ dataNodeInfoReadWriteLock.writeLock().unlock();
+ }
+ return result;
+ }
+
+ /**
+ * get dta node info
+ *
+ * @param plan QueryDataNodeInfoPlan
+ * @return all data node info if dataNodeId of plan is -1
+ */
+ public DataNodesInfoDataSet getDataNodeInfo(QueryDataNodeInfoPlan plan) {
+ DataNodesInfoDataSet result = new DataNodesInfoDataSet();
+ int dataNodeId = plan.getDataNodeID();
+ dataNodeInfoReadWriteLock.readLock().lock();
+ try {
+ if (dataNodeId == -1) {
+ result.setInfoList(new ArrayList<>(onlineDataNodes.values()));
+ } else {
+
result.setInfoList(Collections.singletonList(onlineDataNodes.get(dataNodeId)));
+ }
+ } finally {
+ dataNodeInfoReadWriteLock.readLock().unlock();
+ }
+
+ return result;
+ }
+
+ public Set<Integer> getDataNodeIds() {
+ return onlineDataNodes.keySet();
+ }
+
+ /**
+ * Add schema region group
+ *
+ * @param dataNodeId data node id
+ * @param schemaRegionGroup schema region group
+ */
+ public void addSchemaRegionGroup(int dataNodeId, int schemaRegionGroup) {
+ dataNodeInfoReadWriteLock.writeLock().lock();
+ try {
+ if (onlineDataNodes.containsKey(dataNodeId)) {
+
onlineDataNodes.get(dataNodeId).addSchemaRegionGroup(schemaRegionGroup);
+ }
+ } finally {
+ dataNodeInfoReadWriteLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Add data region group
+ *
+ * @param dataNodeId data node id
+ * @param dataRegionGroup data region group
+ */
+ public void addDataRegionGroup(int dataNodeId, int dataRegionGroup) {
+ dataNodeInfoReadWriteLock.writeLock().lock();
+ try {
+ if (onlineDataNodes.containsKey(dataNodeId)) {
+ onlineDataNodes.get(dataNodeId).addSchemaRegionGroup(dataRegionGroup);
+ }
+ } finally {
+ dataNodeInfoReadWriteLock.writeLock().unlock();
+ }
+ }
+
+ @TestOnly
+ public void clear() {
+ onlineDataNodes.clear();
+ drainingDataNodes.clear();
+ }
+
+ private static class DataNodeInfoPersistenceHolder {
+
+ private static final DataNodeInfoPersistence INSTANCE = new
DataNodeInfoPersistence();
+
+ private DataNodeInfoPersistenceHolder() {
+ // empty constructor
+ }
+ }
+
+ public static DataNodeInfoPersistence getInstance() {
+ return DataNodeInfoPersistence.DataNodeInfoPersistenceHolder.INSTANCE;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
new file mode 100644
index 0000000..578a22d
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
+import org.apache.iotdb.confignode.partition.DataPartitionInfo;
+import org.apache.iotdb.confignode.partition.SchemaPartitionInfo;
+import org.apache.iotdb.confignode.partition.SchemaRegionReplicaSet;
+import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
+import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
+import org.apache.iotdb.consensus.common.DataSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** manage data partition and schema partition */
+public class PartitionInfoPersistence {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionInfoPersistence.class);
+
+ /** schema partition read write lock */
+ private final ReentrantReadWriteLock schemaPartitionReadWriteLock;
+
+ /** data partition read write lock */
+ private final ReentrantReadWriteLock dataPartitionReadWriteLock;
+
+ // TODO: Serialize and Deserialize
+ private final SchemaPartitionInfo schemaPartition;
+
+ // TODO: Serialize and Deserialize
+ private final DataPartitionInfo dataPartition;
+
+ public PartitionInfoPersistence() {
+ this.schemaPartitionReadWriteLock = new ReentrantReadWriteLock();
+ this.dataPartitionReadWriteLock = new ReentrantReadWriteLock();
+ this.schemaPartition = new SchemaPartitionInfo();
+ this.dataPartition = new DataPartitionInfo();
+ }
+
+ /**
+ * Get schema partition
+ *
+ * @param physicalPlan storageGroup and deviceGroupIDs
+ * @return Empty Data Set if does not exist
+ */
+ public DataSet getSchemaPartition(SchemaPartitionPlan physicalPlan) {
+ SchemaPartitionDataSet schemaPartitionDataSet = new
SchemaPartitionDataSet();
+ schemaPartitionReadWriteLock.readLock().lock();
+ try {
+ String storageGroup = physicalPlan.getStorageGroup();
+ List<Integer> deviceGroupIDs = physicalPlan.getDeviceGroupIDs();
+ SchemaPartitionInfo schemaPartitionInfo = new SchemaPartitionInfo();
+ schemaPartitionInfo.setSchemaPartitionInfo(
+ schemaPartition.getSchemaPartition(storageGroup, deviceGroupIDs));
+ schemaPartitionDataSet.setSchemaPartitionInfo(schemaPartitionInfo);
+ } finally {
+ schemaPartitionReadWriteLock.readLock().unlock();
+ }
+ return schemaPartitionDataSet;
+ }
+
+ /**
+ * If does not exist, apply a new schema partition
+ *
+ * @param physicalPlan storage group and device group id
+ * @return Schema Partition data set
+ */
+ public DataSet applySchemaPartition(SchemaPartitionPlan physicalPlan) {
+ String storageGroup = physicalPlan.getStorageGroup();
+ List<Integer> deviceGroupIDs = physicalPlan.getDeviceGroupIDs();
+ List<Integer> noAssignDeviceGroupId =
+ schemaPartition.filterNoAssignDeviceGroupId(storageGroup,
deviceGroupIDs);
+
+ // allocate partition by storage group and device group id
+ Map<Integer, SchemaRegionReplicaSet> deviceGroupIdReplicaSets =
+ physicalPlan.getDeviceGroupIdReplicaSets();
+ schemaPartitionReadWriteLock.writeLock().lock();
+ try {
+
+ deviceGroupIdReplicaSets
+ .entrySet()
+ .forEach(
+ entity -> {
+ schemaPartition.setSchemaRegionReplicaSet(
+ storageGroup, entity.getKey(), entity.getValue());
+ });
+ } finally {
+ schemaPartitionReadWriteLock.writeLock().unlock();
+ }
+
+ return getSchemaPartition(physicalPlan);
+ }
+
+ /**
+ * TODO:allocate schema partition by balancer
+ *
+ * @param physicalPlan physical plan
+ * @return data set
+ */
+ public DataSet applyDataPartition(DataPartitionPlan physicalPlan) {
+ return null;
+ }
+
+ public DataSet getDataPartition(DataPartitionPlan physicalPlan) {
+ return null;
+ }
+
+ public List<Integer> filterSchemaRegionNoAssignDeviceGroupId(
+ String storageGroup, List<Integer> deviceGroupIDs) {
+ return schemaPartition.filterNoAssignDeviceGroupId(storageGroup,
deviceGroupIDs);
+ }
+
+ @TestOnly
+ public void clear() {
+ if (schemaPartition.getSchemaPartitionInfo() != null) {
+ schemaPartition.getSchemaPartitionInfo().clear();
+ }
+
+ if (dataPartition.getDataPartitionMap() != null) {
+ dataPartition.getDataPartitionMap().clear();
+ }
+ }
+
+ private static class PartitionInfoPersistenceHolder {
+
+ private static final PartitionInfoPersistence INSTANCE = new
PartitionInfoPersistence();
+
+ private PartitionInfoPersistenceHolder() {
+ // empty constructor
+ }
+ }
+
+ public static PartitionInfoPersistence getInstance() {
+ return PartitionInfoPersistence.PartitionInfoPersistenceHolder.INSTANCE;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
new file mode 100644
index 0000000..c67fa1d
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import
org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
+import org.apache.iotdb.confignode.partition.DataRegionInfo;
+import org.apache.iotdb.confignode.partition.SchemaRegionInfo;
+import org.apache.iotdb.confignode.partition.SchemaRegionReplicaSet;
+import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.consensus.common.Endpoint;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** manage data partition and schema partition */
+public class RegionInfoPersistence {
+
+ /** partition read write lock */
+ private final ReentrantReadWriteLock partitionReadWriteLock;
+
+ // TODO: Serialize and Deserialize
+ // storageGroupName -> StorageGroupSchema
+ private final Map<String, StorageGroupSchema> storageGroupsMap;
+
+ // TODO: Serialize and Deserialize
+ private int nextSchemaRegionGroup = 0;
+ // TODO: Serialize and Deserialize
+ private int nextDataRegionGroup = 0;
+
+ // TODO: Serialize and Deserialize
+ private final SchemaRegionInfo schemaRegion;
+
+ // TODO: Serialize and Deserialize
+ private final DataRegionInfo dataRegion;
+
+ public RegionInfoPersistence() {
+ this.partitionReadWriteLock = new ReentrantReadWriteLock();
+ this.storageGroupsMap = new HashMap<>();
+ this.schemaRegion = new SchemaRegionInfo();
+ this.dataRegion = new DataRegionInfo();
+ }
+
+ /**
+ * 1. region allocation 2. add to storage group map
+ *
+ * @param plan SetStorageGroupPlan
+ * @return TSStatusCode.SUCCESS_STATUS if region allocate
+ */
+ public TSStatus setStorageGroup(SetStorageGroupPlan plan) {
+ TSStatus result;
+ partitionReadWriteLock.writeLock().lock();
+ try {
+ if (storageGroupsMap.containsKey(plan.getSchema().getName())) {
+ result = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ result.setMessage(
+ String.format("StorageGroup %s is already set.",
plan.getSchema().getName()));
+ } else {
+ StorageGroupSchema schema = new
StorageGroupSchema(plan.getSchema().getName());
+ storageGroupsMap.put(schema.getName(), schema);
+
+ plan.getSchemaRegionInfo()
+ .getSchemaRegionDataNodesMap()
+ .entrySet()
+ .forEach(
+ entity -> {
+ schemaRegion.addSchemaRegion(entity.getKey(),
entity.getValue());
+ entity
+ .getValue()
+ .forEach(
+ dataNodeId -> {
+ DataNodeInfoPersistence.getInstance()
+ .addSchemaRegionGroup(dataNodeId,
entity.getKey());
+ });
+ });
+
+ plan.getDataRegionInfo()
+ .getDataRegionDataNodesMap()
+ .entrySet()
+ .forEach(
+ entity -> {
+ dataRegion.createDataRegion(entity.getKey(),
entity.getValue());
+ entity
+ .getValue()
+ .forEach(
+ dataNodeId -> {
+ DataNodeInfoPersistence.getInstance()
+ .addDataRegionGroup(dataNodeId,
entity.getKey());
+ });
+ });
+
+ result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+ } finally {
+ partitionReadWriteLock.writeLock().unlock();
+ }
+ return result;
+ }
+
+ public StorageGroupSchemaDataSet getStorageGroupSchema() {
+ StorageGroupSchemaDataSet result = new StorageGroupSchemaDataSet();
+ partitionReadWriteLock.readLock().lock();
+ try {
+ result.setSchemaList(new ArrayList<>(storageGroupsMap.values()));
+ } finally {
+ partitionReadWriteLock.readLock().unlock();
+ }
+ return result;
+ }
+
+ /** @return key is schema region id, value is endpoint list */
+ public List<SchemaRegionReplicaSet> getSchemaRegionEndPoint() {
+ List<SchemaRegionReplicaSet> schemaRegionEndPoints = new ArrayList<>();
+
+ schemaRegion
+ .getSchemaRegionDataNodesMap()
+ .entrySet()
+ .forEach(
+ entity -> {
+ SchemaRegionReplicaSet schemaRegionReplicaSet = new
SchemaRegionReplicaSet();
+ List<Endpoint> endPoints = new ArrayList<>();
+ entity
+ .getValue()
+ .forEach(
+ dataNodeId -> {
+ if (DataNodeInfoPersistence.getInstance()
+ .getOnlineDataNodes()
+ .containsKey(dataNodeId)) {
+ endPoints.add(
+ DataNodeInfoPersistence.getInstance()
+ .getOnlineDataNodes()
+ .get(dataNodeId)
+ .getEndPoint());
+ }
+ });
+ schemaRegionReplicaSet.setSchemaRegionId(entity.getKey());
+ schemaRegionReplicaSet.setEndPointList(endPoints);
+ schemaRegionEndPoints.add(schemaRegionReplicaSet);
+ });
+ return schemaRegionEndPoints;
+ }
+
+ public boolean containsStorageGroup(String storageName) {
+ return storageGroupsMap.containsKey(storageName);
+ }
+
+ @TestOnly
+ public void clear() {
+ storageGroupsMap.clear();
+ schemaRegion.getSchemaRegionDataNodesMap().clear();
+ dataRegion.getDataRegionDataNodesMap().clear();
+ }
+
+ private static class RegionInfoPersistenceHolder {
+
+ private static final RegionInfoPersistence INSTANCE = new
RegionInfoPersistence();
+
+ private RegionInfoPersistenceHolder() {
+ // empty constructor
+ }
+ }
+
+ public static RegionInfoPersistence getInstance() {
+ return RegionInfoPersistence.RegionInfoPersistenceHolder.INSTANCE;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
index 858cc9d..abeccbe 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
@@ -18,9 +18,11 @@
*/
package org.apache.iotdb.confignode.physical;
+import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
import org.apache.iotdb.confignode.physical.sys.QueryStorageGroupSchemaPlan;
import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -92,6 +94,18 @@ public abstract class PhysicalPlan implements
IConsensusRequest {
case QueryStorageGroupSchema:
plan = new QueryStorageGroupSchemaPlan();
break;
+ case QueryDataPartition:
+ plan = new DataPartitionPlan(PhysicalPlanType.QueryDataPartition);
+ break;
+ case ApplyDataPartition:
+ plan = new DataPartitionPlan(PhysicalPlanType.ApplyDataPartition);
+ break;
+ case QuerySchemaPartition:
+ plan = new
SchemaPartitionPlan(PhysicalPlanType.QuerySchemaPartition);
+ break;
+ case ApplySchemaPartition:
+ plan = new
SchemaPartitionPlan(PhysicalPlanType.ApplySchemaPartition);
+ break;
default:
throw new IOException("unknown PhysicalPlan type: " + typeNum);
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
index 8706e22..d7606b4 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
@@ -24,5 +24,9 @@ public enum PhysicalPlanType {
SetStorageGroup,
DeleteStorageGroup,
QueryStorageGroupSchema,
- CreateRegion
+ CreateRegion,
+ QueryDataPartition,
+ ApplyDataPartition,
+ QuerySchemaPartition,
+ ApplySchemaPartition
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/DataPartitionPlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/DataPartitionPlan.java
new file mode 100644
index 0000000..556944e
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/DataPartitionPlan.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.physical.sys;
+
+import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * query DataPartition or apply DataPartition by the specific storageGroup and
+ * deviceGroupStartTimeMap.
+ */
+public class DataPartitionPlan extends PhysicalPlan {
+ private String storageGroup;
+ private Map<Integer, List<Integer>> deviceGroupStartTimeMap;
+
+ public DataPartitionPlan(PhysicalPlanType physicalPlanType) {
+ super(physicalPlanType);
+ }
+
+ public DataPartitionPlan(
+ PhysicalPlanType physicalPlanType,
+ String storageGroup,
+ Map<Integer, List<Integer>> deviceGroupStartTimeMap) {
+ this(physicalPlanType);
+ this.storageGroup = storageGroup;
+ this.deviceGroupStartTimeMap = deviceGroupStartTimeMap;
+ }
+
+ public String getStorageGroup() {
+ return storageGroup;
+ }
+
+ public void setStorageGroup(String storageGroup) {
+ this.storageGroup = storageGroup;
+ }
+
+ public Map<Integer, List<Integer>> getDeviceGroupIDs() {
+ return deviceGroupStartTimeMap;
+ }
+
+ public void setDeviceGroupIDs(Map<Integer, List<Integer>> deviceGroupIDs) {
+ this.deviceGroupStartTimeMap = deviceGroupIDs;
+ }
+
+ @Override
+ protected void serializeImpl(ByteBuffer buffer) {
+ buffer.putInt(PhysicalPlanType.QueryDataPartition.ordinal());
+ SerializeDeserializeUtil.write(storageGroup, buffer);
+ SerializeDeserializeUtil.writeIntMapLists(deviceGroupStartTimeMap, buffer);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) {
+ storageGroup = SerializeDeserializeUtil.readString(buffer);
+ deviceGroupStartTimeMap = SerializeDeserializeUtil.readIntMapLists(buffer);
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SchemaPartitionPlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SchemaPartitionPlan.java
new file mode 100644
index 0000000..adf9c45
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SchemaPartitionPlan.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.physical.sys;
+
+import org.apache.iotdb.confignode.partition.SchemaRegionReplicaSet;
+import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Get DataNodeInfo by the specific DataNode's id. And return all when
dataNodeID is set to -1. */
+public class SchemaPartitionPlan extends PhysicalPlan {
+ private String storageGroup;
+ private List<Integer> deviceGroupIDs;
+ private Map<Integer, SchemaRegionReplicaSet> deviceGroupIdReplicaSets;
+
+ public SchemaPartitionPlan(PhysicalPlanType physicalPlanType) {
+ super(physicalPlanType);
+ }
+
+ public SchemaPartitionPlan(
+ PhysicalPlanType physicalPlanType, String storageGroup, List<Integer>
deviceGroupIDs) {
+ this(physicalPlanType);
+ this.storageGroup = storageGroup;
+ this.deviceGroupIDs = deviceGroupIDs;
+ }
+
+ public void setDeviceGroupIdReplicaSet(
+ Map<Integer, SchemaRegionReplicaSet> deviceGroupIdReplicaSets) {
+ this.deviceGroupIdReplicaSets = deviceGroupIdReplicaSets;
+ }
+
+ public Map<Integer, SchemaRegionReplicaSet> getDeviceGroupIdReplicaSets() {
+ return deviceGroupIdReplicaSets;
+ }
+
+ @Override
+ protected void serializeImpl(ByteBuffer buffer) {
+ buffer.putInt(PhysicalPlanType.QueryDataPartition.ordinal());
+ SerializeDeserializeUtil.write(storageGroup, buffer);
+ buffer.putInt(deviceGroupIDs.size());
+ deviceGroupIDs.forEach(id -> SerializeDeserializeUtil.write(id, buffer));
+
+ buffer.putInt(deviceGroupIdReplicaSets.size());
+ for (Map.Entry<Integer, SchemaRegionReplicaSet> entry :
deviceGroupIdReplicaSets.entrySet()) {
+ buffer.putInt(entry.getKey());
+ entry.getValue().serializeImpl(buffer);
+ }
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) {
+ storageGroup = SerializeDeserializeUtil.readString(buffer);
+ int idSize = SerializeDeserializeUtil.readInt(buffer);
+ for (int i = 0; i < idSize; i++) {
+ deviceGroupIDs.add(SerializeDeserializeUtil.readInt(buffer));
+ }
+
+ if (deviceGroupIdReplicaSets == null) {
+ deviceGroupIdReplicaSets = new HashMap<>();
+ }
+ int size = buffer.getInt();
+
+ for (int i = 0; i < size; i++) {
+ SchemaRegionReplicaSet schemaRegionReplicaSet = new
SchemaRegionReplicaSet();
+ schemaRegionReplicaSet.deserializeImpl(buffer);
+ deviceGroupIdReplicaSets.put(buffer.getInt(), schemaRegionReplicaSet);
+ }
+ }
+
+ public String getStorageGroup() {
+ return storageGroup;
+ }
+
+ public List<Integer> getDeviceGroupIDs() {
+ return deviceGroupIDs;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
index 1b8ec41..bf7a7e9 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.confignode.physical.sys;
+import org.apache.iotdb.confignode.partition.DataRegionInfo;
+import org.apache.iotdb.confignode.partition.SchemaRegionInfo;
import org.apache.iotdb.confignode.partition.StorageGroupSchema;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.confignode.physical.PhysicalPlanType;
@@ -28,6 +30,10 @@ public class SetStorageGroupPlan extends PhysicalPlan {
private StorageGroupSchema schema;
+ private SchemaRegionInfo schemaRegionInfo;
+
+ private DataRegionInfo dataRegionInfo;
+
public SetStorageGroupPlan() {
super(PhysicalPlanType.SetStorageGroup);
this.schema = new StorageGroupSchema();
@@ -42,14 +48,38 @@ public class SetStorageGroupPlan extends PhysicalPlan {
return schema;
}
+ public void setSchema(StorageGroupSchema schema) {
+ this.schema = schema;
+ }
+
+ public SchemaRegionInfo getSchemaRegionInfo() {
+ return schemaRegionInfo;
+ }
+
+ public void setSchemaRegionInfo(SchemaRegionInfo schemaRegionInfo) {
+ this.schemaRegionInfo = schemaRegionInfo;
+ }
+
+ public DataRegionInfo getDataRegionInfo() {
+ return dataRegionInfo;
+ }
+
+ public void setDataRegionInfo(DataRegionInfo dataRegionInfo) {
+ this.dataRegionInfo = dataRegionInfo;
+ }
+
@Override
protected void serializeImpl(ByteBuffer buffer) {
buffer.putInt(PhysicalPlanType.SetStorageGroup.ordinal());
schema.serialize(buffer);
+ schemaRegionInfo.serializeImpl(buffer);
+ dataRegionInfo.serializeImpl(buffer);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) {
schema.deserialize(buffer);
+ schemaRegionInfo.deserializeImpl(buffer);
+ dataRegionInfo.deserializeImpl(buffer);
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/balancer/LoadBalancer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/balancer/LoadBalancer.java
index 4cfb8b3..a7e4472 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/balancer/LoadBalancer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/balancer/LoadBalancer.java
@@ -18,24 +18,12 @@
*/
package org.apache.iotdb.confignode.service.balancer;
-import org.apache.iotdb.confignode.partition.PartitionTable;
-
-import java.util.concurrent.locks.Lock;
-
/**
* The LoadBalancer at ConfigNodeGroup-Leader is active for cluster dynamic
load balancing
* scheduling
*/
public class LoadBalancer implements Runnable {
- private final Lock partitionTableLock;
- private final PartitionTable partitionTable;
-
- public LoadBalancer(Lock partitionTableLock, PartitionTable partitionTable) {
- this.partitionTableLock = partitionTableLock;
- this.partitionTable = partitionTable;
- }
-
@Override
public void run() {}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
index c9c90af..6f4c14c 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
@@ -19,28 +19,46 @@
package org.apache.iotdb.confignode.service.executor;
import
org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
-import org.apache.iotdb.confignode.partition.PartitionTable;
+import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
+import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
+import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
public class PlanExecutor {
- private final PartitionTable partitionTable;
+ private final DataNodeInfoPersistence dataNodeInfoPersistence;
+
+ private final RegionInfoPersistence regionInfoPersistence;
+
+ private final PartitionInfoPersistence partitionInfoPersistence;
public PlanExecutor() {
- this.partitionTable = new PartitionTable();
+ this.dataNodeInfoPersistence = DataNodeInfoPersistence.getInstance();
+ this.regionInfoPersistence = RegionInfoPersistence.getInstance();
+ this.partitionInfoPersistence = PartitionInfoPersistence.getInstance();
}
public DataSet executorQueryPlan(PhysicalPlan plan) throws
UnknownPhysicalPlanTypeException {
switch (plan.getType()) {
case QueryDataNodeInfo:
- return partitionTable.getDataNodeInfo((QueryDataNodeInfoPlan) plan);
+ return dataNodeInfoPersistence.getDataNodeInfo((QueryDataNodeInfoPlan)
plan);
case QueryStorageGroupSchema:
- return partitionTable.getStorageGroupSchema();
+ return regionInfoPersistence.getStorageGroupSchema();
+ case QueryDataPartition:
+ return partitionInfoPersistence.getDataPartition((DataPartitionPlan)
plan);
+ case QuerySchemaPartition:
+ return
partitionInfoPersistence.getSchemaPartition((SchemaPartitionPlan) plan);
+ case ApplySchemaPartition:
+ return
partitionInfoPersistence.applySchemaPartition((SchemaPartitionPlan) plan);
+ case ApplyDataPartition:
+ return partitionInfoPersistence.applyDataPartition((DataPartitionPlan)
plan);
default:
throw new UnknownPhysicalPlanTypeException(plan.getType());
}
@@ -49,9 +67,9 @@ public class PlanExecutor {
public TSStatus executorNonQueryPlan(PhysicalPlan plan) throws
UnknownPhysicalPlanTypeException {
switch (plan.getType()) {
case RegisterDataNode:
- return partitionTable.registerDataNode((RegisterDataNodePlan) plan);
+ return dataNodeInfoPersistence.registerDataNode((RegisterDataNodePlan)
plan);
case SetStorageGroup:
- return partitionTable.setStorageGroup((SetStorageGroupPlan) plan);
+ return regionInfoPersistence.setStorageGroup((SetStorageGroupPlan)
plan);
default:
throw new UnknownPhysicalPlanTypeException(plan.getType());
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
index 9619301..1d8dc48 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
@@ -19,13 +19,15 @@
package org.apache.iotdb.confignode.service.thrift.server;
import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
+import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
import
org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.partition.DataNodeInfo;
import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.confignode.physical.PhysicalPlanType;
import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
-import org.apache.iotdb.confignode.physical.sys.QueryStorageGroupSchemaPlan;
import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
import org.apache.iotdb.confignode.rpc.thrift.DataNodeMessage;
@@ -45,9 +47,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo;
import org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.SetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.StorageGroupMessage;
+import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Endpoint;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
@@ -62,7 +63,6 @@ import java.util.Map;
/** ConfigNodeRPCServer exposes the interface that interacts with the DataNode
*/
public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
-
private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigNodeRPCServerProcessor.class);
private final ConfigManager configManager;
@@ -77,17 +77,17 @@ public class ConfigNodeRPCServerProcessor implements
ConfigIService.Iface {
RegisterDataNodePlan plan =
new RegisterDataNodePlan(
-1, new Endpoint(req.getEndPoint().getIp(),
req.getEndPoint().getPort()));
- ConsensusWriteResponse resp = configManager.write(plan);
+ TSStatus status = configManager.registerDataNode(plan);
DataNodeRegisterResp result = new DataNodeRegisterResp();
- result.setRegisterResult(resp.getStatus());
- if (resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- result.setDataNodeID(Integer.parseInt(resp.getStatus().getMessage()));
+ result.setRegisterResult(status);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ result.setDataNodeID(Integer.parseInt(status.getMessage()));
LOGGER.info(
"Register DataNode successful. DataNodeID: {}, {}",
- resp.getStatus().getMessage(),
+ status.getMessage(),
req.getEndPoint().toString());
} else {
- LOGGER.error("Register DataNode failed. {}",
resp.getStatus().getMessage());
+ LOGGER.error("Register DataNode failed. {}", status.getMessage());
}
return result;
}
@@ -95,13 +95,13 @@ public class ConfigNodeRPCServerProcessor implements
ConfigIService.Iface {
@Override
public Map<Integer, DataNodeMessage> getDataNodesMessage(int dataNodeID)
throws TException {
QueryDataNodeInfoPlan plan = new QueryDataNodeInfoPlan(dataNodeID);
- ConsensusReadResponse resp = configManager.read(plan);
+ DataSet dataSet = configManager.getDataNodeInfo(plan);
- if (resp.getDataset() == null) {
+ if (dataSet == null) {
return new HashMap<>();
} else {
Map<Integer, DataNodeMessage> result = new HashMap<>();
- for (DataNodeInfo info : ((DataNodesInfoDataSet)
resp.getDataset()).getInfoList()) {
+ for (DataNodeInfo info : ((DataNodesInfoDataSet) dataSet).getInfoList())
{
result.put(
info.getDataNodeID(),
new DataNodeMessage(
@@ -117,7 +117,8 @@ public class ConfigNodeRPCServerProcessor implements
ConfigIService.Iface {
SetStorageGroupPlan plan =
new SetStorageGroupPlan(
new
org.apache.iotdb.confignode.partition.StorageGroupSchema(req.getStorageGroup()));
- TSStatus resp = configManager.write(plan).getStatus();
+
+ TSStatus resp = configManager.setStorageGroup(plan);
if (resp.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info("Set StorageGroup {} successful.", req.getStorageGroup());
} else {
@@ -133,14 +134,13 @@ public class ConfigNodeRPCServerProcessor implements
ConfigIService.Iface {
@Override
public Map<String, StorageGroupMessage> getStorageGroupsMessage() throws
TException {
- ConsensusReadResponse resp = configManager.read(new
QueryStorageGroupSchemaPlan());
+ DataSet dataSet = configManager.getStorageGroupSchema();
- if (resp.getDataset() == null) {
+ if (dataSet == null) {
return new HashMap<>();
} else {
Map<String, StorageGroupMessage> result = new HashMap<>();
- for (StorageGroupSchema schema :
- ((StorageGroupSchemaDataSet) resp.getDataset()).getSchemaList()) {
+ for (StorageGroupSchema schema : ((StorageGroupSchemaDataSet)
dataSet).getSchemaList()) {
result.put(schema.getName(), new
StorageGroupMessage(schema.getName()));
}
return result;
@@ -149,16 +149,39 @@ public class ConfigNodeRPCServerProcessor implements
ConfigIService.Iface {
@Override
public DeviceGroupHashInfo getDeviceGroupHashInfo() throws TException {
+ return configManager.getDeviceGroupHashInfo();
+ }
+
+ @Override
+ public DataPartitionInfo applyDataPartition(GetDataPartitionReq req) throws
TException {
return null;
}
@Override
+ public SchemaPartitionInfo applySchemaPartition(GetSchemaPartitionReq req)
throws TException {
+ SchemaPartitionPlan applySchemaPartitionPlan =
+ new SchemaPartitionPlan(
+ PhysicalPlanType.ApplySchemaPartition, req.getStorageGroup(),
req.getDeviceGroupIDs());
+ DataSet dataSet =
configManager.applySchemaPartition(applySchemaPartitionPlan);
+ ((SchemaPartitionDataSet) dataSet).getSchemaPartitionInfo();
+
+ return SchemaPartitionDataSet.convertRpcSchemaPartition(
+ ((SchemaPartitionDataSet) dataSet).getSchemaPartitionInfo());
+ }
+
+ @Override
public SchemaPartitionInfo getSchemaPartition(GetSchemaPartitionReq req)
throws TException {
- return null;
+ SchemaPartitionPlan querySchemaPartitionPlan =
+ new SchemaPartitionPlan(
+ PhysicalPlanType.QuerySchemaPartition, req.getStorageGroup(),
req.getDeviceGroupIDs());
+ DataSet dataSet =
configManager.getSchemaPartition(querySchemaPartitionPlan);
+ return SchemaPartitionDataSet.convertRpcSchemaPartition(
+ ((SchemaPartitionDataSet) dataSet).getSchemaPartitionInfo());
}
@Override
public DataPartitionInfo getDataPartition(GetDataPartitionReq req) throws
TException {
+
return null;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/util/SerializeDeserializeUtil.java
b/confignode/src/main/java/org/apache/iotdb/confignode/util/SerializeDeserializeUtil.java
new file mode 100644
index 0000000..7461827
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/util/SerializeDeserializeUtil.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.util;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SerializeDeserializeUtil {
+ public static final int INT_LEN = 4;
+
+ private SerializeDeserializeUtil() {}
+
+ /** read string from byteBuffer. */
+ public static String readString(ByteBuffer buffer) {
+ int strLength = readInt(buffer);
+ if (strLength < 0) {
+ return null;
+ } else if (strLength == 0) {
+ return "";
+ }
+ byte[] bytes = new byte[strLength];
+ buffer.get(bytes, 0, strLength);
+ return new String(bytes, 0, strLength);
+ }
+
+ /** read a int var from byteBuffer. */
+ public static int readInt(ByteBuffer buffer) {
+ return buffer.getInt();
+ }
+
+ /**
+ * write string to byteBuffer.
+ *
+ * @return the length of string represented by byte[].
+ */
+ public static int write(String s, ByteBuffer buffer) {
+ if (s == null) {
+ return write(-1, buffer);
+ }
+ int len = 0;
+ byte[] bytes = s.getBytes();
+ len += write(bytes.length, buffer);
+ buffer.put(bytes);
+ len += bytes.length;
+ return len;
+ }
+
+ /**
+ * write a int n to byteBuffer.
+ *
+ * @return The number of bytes used to represent n.
+ */
+ public static int write(int n, ByteBuffer buffer) {
+ buffer.putInt(n);
+ return INT_LEN;
+ }
+
+ /**
+ * write a map to buffer
+ *
+ * @param map Map<String, String>
+ * @param buffer
+ * @return length
+ */
+ public static int write(Map<String, String> map, ByteBuffer buffer) {
+ if (map == null) {
+ return write(-1, buffer);
+ }
+
+ int length = 0;
+ byte[] bytes;
+ buffer.putInt(map.size());
+ length += 4;
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ bytes = entry.getKey().getBytes();
+ buffer.putInt(bytes.length);
+ length += 4;
+ buffer.put(bytes);
+ length += bytes.length;
+ bytes = entry.getValue().getBytes();
+ buffer.putInt(bytes.length);
+ length += 4;
+ buffer.put(bytes);
+ length += bytes.length;
+ }
+ return length;
+ }
+
+ /**
+ * read map from buffer
+ *
+ * @param buffer ByteBuffer
+ * @return Map<String, String>
+ */
+ public static Map<String, String> readMap(ByteBuffer buffer) {
+ int length = readInt(buffer);
+ if (length == -1) {
+ return null;
+ }
+ Map<String, String> map = new HashMap<>(length);
+ for (int i = 0; i < length; i++) {
+ // key
+ String key = readString(buffer);
+ // value
+ String value = readString(buffer);
+ map.put(key, value);
+ }
+ return map;
+ }
+
+ /**
+ * write a string map to buffer
+ *
+ * @param map Map<String, String>
+ * @param buffer
+ * @return length
+ */
+ public static int writeStringMapLists(Map<String, List<String>> map,
ByteBuffer buffer) {
+ if (map == null) {
+ return write(-1, buffer);
+ }
+
+ int length = 0;
+ byte[] bytes;
+ buffer.putInt(map.size());
+ for (Map.Entry<String, List<String>> entry : map.entrySet()) {
+ bytes = entry.getKey().getBytes();
+ buffer.putInt(bytes.length);
+ buffer.put(bytes);
+ buffer.putInt(entry.getValue().size());
+ entry
+ .getValue()
+ .forEach(
+ b -> {
+ buffer.putInt(b.length());
+ buffer.put(b.getBytes());
+ });
+ }
+ return length;
+ }
+
+ /**
+ * read string map from buffer
+ *
+ * @param buffer ByteBuffer
+ * @return Map<String, String>
+ */
+ public static Map<String, List<String>> readStringMapLists(ByteBuffer
buffer) {
+ int length = readInt(buffer);
+ if (length == -1) {
+ return null;
+ }
+ Map<String, List<String>> map = new HashMap<>(length);
+ for (int i = 0; i < length; i++) {
+ // key
+ String key = readString(buffer);
+ // value
+ int listSize = readInt(buffer);
+ List<String> valueList = new ArrayList<>();
+ for (int j = 0; j < listSize; j++) {
+ String value = readString(buffer);
+ valueList.add(value);
+ }
+ map.put(key, valueList);
+ }
+ return map;
+ }
+
+ /**
+ * write a string map to buffer
+ *
+ * @param map Map<String, String>
+ * @param buffer
+ * @return length
+ */
+ public static int writeIntMapLists(Map<Integer, List<Integer>> map,
ByteBuffer buffer) {
+ if (map == null) {
+ return write(-1, buffer);
+ }
+
+ int length = 0;
+ buffer.putInt(map.size());
+ for (Map.Entry<Integer, List<Integer>> entry : map.entrySet()) {
+ buffer.putInt(entry.getKey());
+ buffer.putInt(entry.getValue().size());
+ entry
+ .getValue()
+ .forEach(
+ b -> {
+ buffer.putInt(b);
+ });
+ }
+ return length;
+ }
+
+ /**
+ * read string map from buffer
+ *
+ * @param buffer ByteBuffer
+ * @return Map<String, String>
+ */
+ public static Map<Integer, List<Integer>> readIntMapLists(ByteBuffer buffer)
{
+ int length = readInt(buffer);
+ if (length == -1) {
+ return null;
+ }
+ Map<Integer, List<Integer>> map = new HashMap<>(length);
+ for (int i = 0; i < length; i++) {
+ // key
+ int key = readInt(buffer);
+ // value
+ int listSize = readInt(buffer);
+ List<Integer> valueList = new ArrayList<>();
+ for (int j = 0; j < listSize; j++) {
+ int value = readInt(buffer);
+ valueList.add(value);
+ }
+ map.put(key, valueList);
+ }
+ return map;
+ }
+}
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
index f99daa9..d9f9cbb 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.confignode.manager.hash;
-import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.ConsensusManager;
import java.io.IOException;
import java.util.ArrayList;
@@ -60,7 +60,7 @@ public class DeviceGroupHashExecutorManualTest {
}
public void GeneralIndexTest() throws IOException {
- ConfigManager manager = new ConfigManager();
+ ConsensusManager manager = new ConsensusManager();
int[] bucket = new int[deviceGroupCount];
Arrays.fill(bucket, 0);
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
index 23fd30d..cf96e33 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
@@ -18,9 +18,16 @@
*/
package org.apache.iotdb.confignode.service.thrift.server;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
+import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
+import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
import org.apache.iotdb.confignode.rpc.thrift.DataNodeMessage;
import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.DeviceGroupHashInfo;
+import org.apache.iotdb.confignode.rpc.thrift.GetSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo;
import org.apache.iotdb.confignode.rpc.thrift.SetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.StorageGroupMessage;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -28,7 +35,9 @@ import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
@@ -39,6 +48,16 @@ import java.util.Map;
public class ConfigNodeRPCServerProcessorTest {
+ @Before
+ public void before() {}
+
+ @After
+ public void after() {
+ DataNodeInfoPersistence.getInstance().clear();
+ PartitionInfoPersistence.getInstance().clear();
+ RegionInfoPersistence.getInstance().clear();
+ }
+
@Test
public void registerDataNodeTest() throws TException, IOException {
ConfigNodeRPCServerProcessor processor = new
ConfigNodeRPCServerProcessor();
@@ -114,4 +133,136 @@ public class ConfigNodeRPCServerProcessorTest {
Assert.assertNotNull(messageMap.get(sg));
Assert.assertEquals(sg, messageMap.get(sg).getStorageGroup());
}
+
+ @Test
+ public void getDeviceGroupHashInfoTest() throws TException, IOException {
+ ConfigNodeRPCServerProcessor processor = new
ConfigNodeRPCServerProcessor();
+ // get Device Group hash
+ DeviceGroupHashInfo deviceGroupHashInfo = new DeviceGroupHashInfo();
+ deviceGroupHashInfo = processor.getDeviceGroupHashInfo();
+ Assert.assertEquals(
+ deviceGroupHashInfo.getDeviceGroupCount(),
+ ConfigNodeDescriptor.getInstance().getConf().getDeviceGroupCount());
+ Assert.assertEquals(
+ deviceGroupHashInfo.getHashClass(),
+
ConfigNodeDescriptor.getInstance().getConf().getDeviceGroupHashExecutorClass());
+ }
+
+ @Test
+ public void applySchemaPartitionTest() throws TException, IOException {
+ ConfigNodeRPCServerProcessor processor = new
ConfigNodeRPCServerProcessor();
+
+ TSStatus status;
+ final String sg = "root.sg0";
+
+ // failed because there are not enough DataNodes
+ SetStorageGroupReq setReq = new SetStorageGroupReq(sg);
+ status = processor.setStorageGroup(setReq);
+ Assert.assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(),
status.getCode());
+ Assert.assertEquals("DataNode is not enough, please register more.",
status.getMessage());
+
+ // register DataNodes
+ DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new
EndPoint("0.0.0.0", 6667));
+ DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new
EndPoint("0.0.0.0", 6668));
+ DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new
EndPoint("0.0.0.0", 6669));
+ status = processor.registerDataNode(registerReq0).getRegisterResult();
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ status = processor.registerDataNode(registerReq1).getRegisterResult();
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ status = processor.registerDataNode(registerReq2).getRegisterResult();
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ // set StorageGroup
+ status = processor.setStorageGroup(setReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ // applySchemaPartition
+ GetSchemaPartitionReq getSchemaPartitionReq = new GetSchemaPartitionReq();
+ List<Integer> deviceGroupIds = new ArrayList<>();
+ Integer deviceGroupId = 1000;
+ deviceGroupIds.add(deviceGroupId);
+
getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+ SchemaPartitionInfo schemaPartitionInfo =
processor.applySchemaPartition(getSchemaPartitionReq);
+ Assert.assertTrue(schemaPartitionInfo != null);
+
Assert.assertTrue(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg) !=
null);
+ schemaPartitionInfo
+ .getSchemaRegionDataNodesMap()
+ .get(sg)
+ .entrySet()
+ .forEach(
+ entity -> {
+ Assert.assertEquals(deviceGroupId, entity.getKey());
+ });
+ }
+
+ @Test
+ public void getSchemaPartitionTest() throws TException, IOException {
+ ConfigNodeRPCServerProcessor processor = new
ConfigNodeRPCServerProcessor();
+
+ TSStatus status;
+ final String sg = "root.sg0";
+
+ // failed because there are not enough DataNodes
+ SetStorageGroupReq setReq = new SetStorageGroupReq(sg);
+ status = processor.setStorageGroup(setReq);
+ Assert.assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(),
status.getCode());
+ Assert.assertEquals("DataNode is not enough, please register more.",
status.getMessage());
+
+ // register DataNodes
+ DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new
EndPoint("0.0.0.0", 6667));
+ DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new
EndPoint("0.0.0.0", 6668));
+ DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new
EndPoint("0.0.0.0", 6669));
+ status = processor.registerDataNode(registerReq0).getRegisterResult();
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ status = processor.registerDataNode(registerReq1).getRegisterResult();
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ status = processor.registerDataNode(registerReq2).getRegisterResult();
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ // set StorageGroup
+ status = processor.setStorageGroup(setReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ // getSchemaPartition
+ GetSchemaPartitionReq getSchemaPartitionReq = new GetSchemaPartitionReq();
+ List<Integer> deviceGroupIds = new ArrayList<>();
+ Integer deviceGroupId = 1000;
+ deviceGroupIds.add(deviceGroupId);
+
getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+ SchemaPartitionInfo schemaPartitionInfo =
processor.getSchemaPartition(getSchemaPartitionReq);
+ Assert.assertTrue(schemaPartitionInfo != null);
+
Assert.assertTrue(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg) !=
null);
+
+ // because does not apply schema partition, so schema partition is null
+ Assert.assertEquals(
+ null,
schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg).get(deviceGroupId));
+
+ // applySchemaPartition
+ deviceGroupIds.add(deviceGroupId);
+
getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+ schemaPartitionInfo =
processor.applySchemaPartition(getSchemaPartitionReq);
+ Assert.assertTrue(schemaPartitionInfo != null);
+
Assert.assertTrue(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg) !=
null);
+ schemaPartitionInfo
+ .getSchemaRegionDataNodesMap()
+ .get(sg)
+ .entrySet()
+ .forEach(
+ entity -> {
+ Assert.assertEquals(deviceGroupId, entity.getKey());
+ });
+
+ // getSchemaPartition twice
+ getSchemaPartitionReq = new GetSchemaPartitionReq();
+ deviceGroupIds = new ArrayList<>();
+ deviceGroupIds.add(deviceGroupId);
+
getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+ schemaPartitionInfo = processor.getSchemaPartition(getSchemaPartitionReq);
+ Assert.assertTrue(schemaPartitionInfo != null);
+
Assert.assertTrue(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg) !=
null);
+
+ // because apply schema partition, so schema partition is not null
+ Assert.assertTrue(
+
schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg).get(deviceGroupId) !=
null);
+ }
}
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/utils/SerializeDeserializeUtilTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/utils/SerializeDeserializeUtilTest.java
new file mode 100644
index 0000000..26b2d75
--- /dev/null
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/utils/SerializeDeserializeUtilTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.utils;
+
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SerializeDeserializeUtilTest {
+ protected static final int DEFAULT_BUFFER_SIZE = 4096;
+
+ @Test
+ public void readWriteStringMapFromBufferTest() throws IOException {
+ // 1. read write map<String, String>
+ String key = "string";
+ String value = "string";
+ Map<String, String> map = new HashMap<>();
+ map.put(key, value);
+
+ ByteBuffer bf = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ SerializeDeserializeUtil.write(map, bf);
+ byte[] b = bf.array();
+ bf.clear();
+ Map<String, String> result =
SerializeDeserializeUtil.readMap(ByteBuffer.wrap(b));
+ Assert.assertNotNull(result);
+ Assert.assertEquals(map, result);
+ }
+
+ @Test
+ public void readWriteStringMapListFromBufferTest() throws IOException {
+ String key = "string";
+ String value = "string";
+ // 2. read write map<String, List<String>>
+ Map<String, List<String>> stringMapLists = new HashMap<>();
+ List<String> keyLists = new ArrayList<>();
+ keyLists.add(value);
+ stringMapLists.put(key, keyLists);
+
+ ByteBuffer bf = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ SerializeDeserializeUtil.writeStringMapLists(stringMapLists, bf);
+ byte[] b = bf.array();
+ bf.clear();
+ Map<String, List<String>> stringMapListsResult =
+ SerializeDeserializeUtil.readStringMapLists(ByteBuffer.wrap(b));
+ Assert.assertNotNull(stringMapListsResult);
+ Assert.assertEquals(stringMapLists, stringMapListsResult);
+ }
+
+ @Test
+ public void readWriteIntMapListFromBufferTest() throws IOException {
+ // 3. read write map<Integer, List<Integer>>
+ Map<Integer, List<Integer>> integerMapLists = new HashMap<>();
+ List<Integer> intLists = new ArrayList<>();
+ intLists.add(1);
+ integerMapLists.put(1, intLists);
+
+ ByteBuffer bf = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ byte[] b = bf.array();
+
+ SerializeDeserializeUtil.writeIntMapLists(integerMapLists, bf);
+ Map<Integer, List<Integer>> intMapListsResult =
+ SerializeDeserializeUtil.readIntMapLists(ByteBuffer.wrap(b));
+ Assert.assertNotNull(intMapListsResult);
+ Assert.assertEquals(integerMapLists, intMapListsResult);
+ }
+}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/common/Endpoint.java
b/consensus/src/main/java/org/apache/iotdb/consensus/common/Endpoint.java
index b01b7e4..6e3951b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Endpoint.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Endpoint.java
@@ -19,13 +19,16 @@
package org.apache.iotdb.consensus.common;
+import java.nio.ByteBuffer;
import java.util.Objects;
// TODO Use a mature IDL framework such as Protobuf to manage this structure
public class Endpoint {
- private final String ip;
- private final int port;
+ private String ip;
+ private int port;
+
+ public Endpoint() {}
public Endpoint(String ip, int port) {
this.ip = ip;
@@ -40,6 +43,23 @@ public class Endpoint {
return port;
}
+ public void serializeImpl(ByteBuffer buffer) {
+ byte[] bytes = ip.getBytes();
+ buffer.putInt(bytes.length);
+ buffer.put(bytes);
+
+ buffer.putInt(port);
+ }
+
+ public void deserializeImpl(ByteBuffer buffer) {
+ int length = buffer.getInt();
+ byte[] bytes = new byte[length];
+ buffer.get(bytes, 0, length);
+ ip = new String(bytes, 0, length);
+
+ port = buffer.getInt();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionInfo.java
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionInfo.java
index 119115c..bb629f0 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionInfo.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionInfo.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.commons.partition;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class SchemaPartitionInfo {
@@ -33,4 +35,19 @@ public class SchemaPartitionInfo {
Map<String, Map<DeviceGroupId, SchemaRegionReplicaSet>>
schemaPartitionInfo) {
this.schemaPartitionInfo = schemaPartitionInfo;
}
+
+ public Map<String, Map<Integer, SchemaRegionReplicaSet>> getSchemaPartition(
+ String storageGroup, List<Integer> deviceGroupIDs) {
+ Map<String, Map<Integer, SchemaRegionReplicaSet>> storageGroupMap = new
HashMap<>();
+ Map<Integer, SchemaRegionReplicaSet> deviceGroupMap = new HashMap<>();
+ deviceGroupIDs.forEach(
+ deviceGroupID -> {
+ if (schemaPartitionInfo.get(storageGroup).containsKey(new
DeviceGroupId(deviceGroupID))) {
+ deviceGroupMap.put(
+ deviceGroupID,
schemaPartitionInfo.get(storageGroup).get(deviceGroupID));
+ }
+ });
+ storageGroupMap.put(storageGroup, deviceGroupMap);
+ return storageGroupMap;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
index 476bc16..d1fade4 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
@@ -33,7 +33,7 @@ public interface IPartitionFetcher {
SchemaPartitionInfo fetchSchemaPartitionInfo(String devicePath);
- SchemaPartitionInfo fetchSchemaPartitionInfos(List<String> devicePath);
+ SchemaPartitionInfo fetchSchemaPartitionInfos(List<String> devicePaths);
PartitionInfo fetchPartitionInfo(DataPartitionQueryParam parameter);
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index d58b449..2e3fccb 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -56,9 +56,17 @@ struct GetSchemaPartitionReq {
2: required list<i32> deviceGroupIDs
}
+struct RegionReplicaSet {
+ 1: required i32 regionId
+ 2: required list<rpc.EndPoint> endpoint
+}
+
struct SchemaPartitionInfo {
- 1: required map<i32, i32> deviceGroupSchemaRegionGroupMap
- 2: required map<i32, list<i32>> SchemaRegionGroupDataNodeMap
+ 1: required map<string, map<i32, RegionReplicaSet>>
schemaRegionDataNodesMap
+}
+
+struct DataPartitionInfo {
+ 1: required map<string, map<i64, map<i32, list<RegionReplicaSet>>>>
deviceGroupStartTimeDataRegionGroupMap
}
struct GetDataPartitionReq {
@@ -66,11 +74,6 @@ struct GetDataPartitionReq {
2: required map<i32, list<i64>> deviceGroupStartTimeMap
}
-struct DataPartitionInfo {
- 1: required map<i32, map<i64, list<i32>>>
deviceGroupStartTimeDataRegionGroupMap
- 2: required map<i32, list<i32>> dataRegionGroupDataNodeMap
-}
-
struct DeviceGroupHashInfo {
1: required i32 deviceGroupCount
2: required string hashClass
@@ -131,6 +134,12 @@ service ConfigIService {
DeviceGroupHashInfo getDeviceGroupHashInfo()
+ // apply data partition when write data
+ DataPartitionInfo applyDataPartition(GetDataPartitionReq req)
+
+ // apply schema partition when create schema
+ SchemaPartitionInfo applySchemaPartition(GetSchemaPartitionReq req)
+
DataPartitionInfoResp fetchDataPartitionInfo(FetchDataPartitionReq req)
SchemaPartitionInfoResp fetchSchemaPartitionInfo(FetchSchemaPartitionReq req)