This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c96cfa7  [IOTDB-2684] refactoring confignode architecture and schema 
partition assign (#5374)
c96cfa7 is described below

commit c96cfa711d089d0661cc5553976dd0f6575ddfde
Author: wangchao316 <[email protected]>
AuthorDate: Fri Apr 1 17:50:07 2022 +0800

    [IOTDB-2684] refactoring confignode architecture and schema partition 
assign (#5374)
---
 .../iotdb/confignode/conf/ConfigNodeConfCheck.java |  28 ++-
 ...chemaDataSet.java => DataPartitionDataSet.java} |  18 +-
 .../consensus/response/SchemaPartitionDataSet.java |  79 +++++++
 .../response/StorageGroupSchemaDataSet.java        |   8 +-
 .../iotdb/confignode/manager/ConfigManager.java    | 214 +++++++++---------
 .../{ConfigManager.java => ConsensusManager.java}  |  26 +--
 .../iotdb/confignode/manager/DataNodeManager.java  | 169 ++++++++++++++
 .../apache/iotdb/confignode/manager/Manager.java   | 119 ++++++++++
 .../iotdb/confignode/manager/PartitionManager.java | 146 +++++++++++++
 .../iotdb/confignode/manager/RegionManager.java    | 152 +++++++++++++
 .../iotdb/confignode/partition/DataNodeInfo.java   |   8 +-
 .../confignode/partition/DataPartitionInfo.java    |  65 ++----
 ...{DataPartitionInfo.java => DataRegionInfo.java} |  46 ++--
 .../iotdb/confignode/partition/PartitionTable.java | 189 ----------------
 .../confignode/partition/SchemaPartitionInfo.java  |  75 +++++--
 ...emaPartitionInfo.java => SchemaRegionInfo.java} |  41 ++--
 .../partition/SchemaRegionReplicaSet.java          |  82 +++++++
 .../persistence/DataNodeInfoPersistence.java       | 186 ++++++++++++++++
 .../persistence/PartitionInfoPersistence.java      | 157 +++++++++++++
 .../persistence/RegionInfoPersistence.java         | 189 ++++++++++++++++
 .../iotdb/confignode/physical/PhysicalPlan.java    |  14 ++
 .../confignode/physical/PhysicalPlanType.java      |   6 +-
 .../confignode/physical/sys/DataPartitionPlan.java |  78 +++++++
 .../physical/sys/SchemaPartitionPlan.java          |  98 +++++++++
 .../physical/sys/SetStorageGroupPlan.java          |  30 +++
 .../confignode/service/balancer/LoadBalancer.java  |  12 -
 .../confignode/service/executor/PlanExecutor.java  |  32 ++-
 .../server/ConfigNodeRPCServerProcessor.java       |  61 ++++--
 .../confignode/util/SerializeDeserializeUtil.java  | 242 +++++++++++++++++++++
 .../hash/DeviceGroupHashExecutorManualTest.java    |   4 +-
 .../server/ConfigNodeRPCServerProcessorTest.java   | 151 +++++++++++++
 .../utils/SerializeDeserializeUtilTest.java        |  90 ++++++++
 .../apache/iotdb/consensus/common/Endpoint.java    |  24 +-
 .../commons/partition/SchemaPartitionInfo.java     |  17 ++
 .../db/mpp/sql/analyze/IPartitionFetcher.java      |   2 +-
 .../src/main/thrift/confignode.thrift              |  23 +-
 36 files changed, 2349 insertions(+), 532 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java
index a6e0f9c..2a8c495 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java
@@ -52,16 +52,11 @@ public class ConfigNodeConfCheck {
   public void checkConfig() throws ConfigurationException, IOException, 
StartupException {
     // If systemDir does not exist, create systemDir
     File systemDir = new File(conf.getSystemDir());
-    if (!systemDir.exists()) {
-      if (systemDir.mkdirs()) {
-        LOGGER.info("Make system dirs: {}", systemDir);
-      } else {
-        throw new IOException(
-            String.format(
-                "Start ConfigNode failed, because couldn't make system dirs: 
%s.",
-                systemDir.getAbsolutePath()));
-      }
-    }
+    createDir(systemDir);
+
+    // If consensusDir does not exist, create consensusDir
+    File consensusDir = new File(conf.getConsensusDir());
+    createDir(consensusDir);
 
     File specialPropertiesFile =
         new File(conf.getSystemDir() + File.separator + 
ConfigNodeConstant.SPECIAL_CONF_NAME);
@@ -90,6 +85,19 @@ public class ConfigNodeConfCheck {
     }
   }
 
+  private void createDir(File dir) throws IOException {
+    if (!dir.exists()) {
+      if (dir.mkdirs()) {
+        LOGGER.info("Make dirs: {}", dir);
+      } else {
+        throw new IOException(
+            String.format(
+                "Start ConfigNode failed, because couldn't make system dirs: 
%s.",
+                dir.getAbsolutePath()));
+      }
+    }
+  }
+
   /**
    * There are some special parameters that can't be changed after once we 
start ConfigNode.
    * Therefore, store them in iotdb-confignode-special.properties at the first 
startup
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
similarity index 70%
copy from 
confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
copy to 
confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
index a523420..8c8c811 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
@@ -16,22 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.consensus.response;
 
-import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.commons.partition.DataPartitionInfo;
 import org.apache.iotdb.consensus.common.DataSet;
 
-import java.util.List;
-
-public class StorageGroupSchemaDataSet implements DataSet {
-
-  private final List<StorageGroupSchema> schemaList;
+public class DataPartitionDataSet implements DataSet {
+  private DataPartitionInfo dataPartitionInfo;
 
-  public StorageGroupSchemaDataSet(List<StorageGroupSchema> schemaList) {
-    this.schemaList = schemaList;
+  public DataPartitionInfo getDataPartitionInfo() {
+    return dataPartitionInfo;
   }
 
-  public List<StorageGroupSchema> getSchemaList() {
-    return schemaList;
+  public void setDataPartitionInfos(DataPartitionInfo dataPartitionInfo) {
+    this.dataPartitionInfo = dataPartitionInfo;
   }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
new file mode 100644
index 0000000..0a0acbe
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.response;
+
+import org.apache.iotdb.confignode.partition.SchemaPartitionInfo;
+import org.apache.iotdb.confignode.rpc.thrift.RegionReplicaSet;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SchemaPartitionDataSet implements DataSet {
+  private SchemaPartitionInfo schemaPartitionInfo;
+
+  public SchemaPartitionInfo getSchemaPartitionInfo() {
+    return schemaPartitionInfo;
+  }
+
+  public void setSchemaPartitionInfo(SchemaPartitionInfo schemaPartitionInfos) 
{
+    this.schemaPartitionInfo = schemaPartitionInfos;
+  }
+
+  public static org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo
+      convertRpcSchemaPartition(SchemaPartitionInfo schemaPartitionInfo) {
+    org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo 
rpcSchemaPartitionInfo =
+        new org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo();
+
+    Map<String, Map<Integer, RegionReplicaSet>> schemaRegionRelicatSets = new 
HashMap<>();
+
+    schemaPartitionInfo.getSchemaPartitionInfo().entrySet().stream()
+        .forEach(
+            entity -> {
+              schemaRegionRelicatSets.putIfAbsent(entity.getKey(), new 
HashMap<>());
+              entity
+                  .getValue()
+                  .entrySet()
+                  .forEach(
+                      replica -> {
+                        RegionReplicaSet regionReplicaSet = new 
RegionReplicaSet();
+                        regionReplicaSet.setRegionId(replica.getKey());
+                        List<EndPoint> endPoints = new ArrayList<>();
+                        replica
+                            .getValue()
+                            .getEndPointList()
+                            .forEach(
+                                point -> {
+                                  EndPoint endPoint = new 
EndPoint(point.getIp(), point.getPort());
+                                  endPoints.add(endPoint);
+                                });
+                        regionReplicaSet.setEndpoint(endPoints);
+                        schemaRegionRelicatSets
+                            .get(entity.getKey())
+                            .put(replica.getKey(), regionReplicaSet);
+                      });
+            });
+    
rpcSchemaPartitionInfo.setSchemaRegionDataNodesMap(schemaRegionRelicatSets);
+    return rpcSchemaPartitionInfo;
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
index a523420..934f050 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
@@ -25,7 +25,9 @@ import java.util.List;
 
 public class StorageGroupSchemaDataSet implements DataSet {
 
-  private final List<StorageGroupSchema> schemaList;
+  private List<StorageGroupSchema> schemaList;
+
+  public StorageGroupSchemaDataSet() {}
 
   public StorageGroupSchemaDataSet(List<StorageGroupSchema> schemaList) {
     this.schemaList = schemaList;
@@ -34,4 +36,8 @@ public class StorageGroupSchemaDataSet implements DataSet {
   public List<StorageGroupSchema> getSchemaList() {
     return schemaList;
   }
+
+  public void setSchemaList(List<StorageGroupSchema> schemaList) {
+    this.schemaList = schemaList;
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 1fd5767..d533f06 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -16,154 +16,134 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.manager;
 
-import org.apache.iotdb.commons.hash.DeviceGroupHashExecutor;
 import org.apache.iotdb.confignode.conf.ConfigNodeConf;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import 
org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
+import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
 import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.consensus.IConsensus;
-import org.apache.iotdb.consensus.common.ConsensusGroupId;
-import org.apache.iotdb.consensus.common.Endpoint;
-import org.apache.iotdb.consensus.common.GroupType;
-import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.consensus.ratis.RatisConsensus;
-import org.apache.iotdb.consensus.standalone.StandAloneConsensus;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
+import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
+import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
+import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
+import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.confignode.rpc.thrift.DeviceGroupHashInfo;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * ConfigManager maintains consistency between PartitionTables in the 
ConfigNodeGroup. Expose the
- * query interface for the PartitionTable
- */
-public class ConfigManager {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigManager.class);
+/** Entry of all management, AssignPartitionManager,AssignRegionManager. */
+public class ConfigManager implements Manager {
   private static final ConfigNodeConf conf = 
ConfigNodeDescriptor.getInstance().getConf();
+  private static final TSStatus ERROR_TSSTATUS =
+      new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+
+  /** manage consensus, write or read consensus */
+  private final ConsensusManager consensusManager;
+
+  /** manage data node */
+  private final DataNodeManager dataNodeManager;
 
-  private IConsensus consensusImpl;
-  private ConsensusGroupId consensusGroupId;
+  /** manage assign data partition and schema partition */
+  private final PartitionManager partitionManager;
 
-  private DeviceGroupHashExecutor hashExecutor;
+  /** manager assign schema region and data region */
+  private final RegionManager regionManager;
 
   public ConfigManager() throws IOException {
-    setHashExecutor();
-    setConsensusLayer();
+    this.dataNodeManager = new DataNodeManager(this);
+    this.partitionManager = new PartitionManager(this);
+    this.regionManager = new RegionManager(this);
+    this.consensusManager = new ConsensusManager();
+  }
+
+  @Override
+  public boolean isStopped() {
+    return false;
+  }
+
+  @Override
+  public TSStatus registerDataNode(PhysicalPlan physicalPlan) {
+    if (physicalPlan instanceof RegisterDataNodePlan) {
+      return dataNodeManager.registerDataNode((RegisterDataNodePlan) 
physicalPlan);
+    }
+    return ERROR_TSSTATUS;
   }
 
-  /** Build DeviceGroupHashExecutor */
-  private void setHashExecutor() {
-    try {
-      Class<?> executor = 
Class.forName(conf.getDeviceGroupHashExecutorClass());
-      Constructor<?> executorConstructor = executor.getConstructor(int.class);
-      hashExecutor =
-          (DeviceGroupHashExecutor) 
executorConstructor.newInstance(conf.getDeviceGroupCount());
-    } catch (ClassNotFoundException
-        | NoSuchMethodException
-        | InstantiationException
-        | IllegalAccessException
-        | InvocationTargetException e) {
-      LOGGER.error(
-          "Couldn't Constructor DeviceGroupHashExecutor class: {}",
-          conf.getDeviceGroupHashExecutorClass(),
-          e);
-      hashExecutor = null;
+  @Override
+  public DataSet getDataNodeInfo(PhysicalPlan physicalPlan) {
+    if (physicalPlan instanceof QueryDataNodeInfoPlan) {
+      return dataNodeManager.getDataNodeInfo((QueryDataNodeInfoPlan) 
physicalPlan);
     }
+    return new DataNodesInfoDataSet();
   }
 
-  public int getDeviceGroupID(String device) {
-    return hashExecutor.getDeviceGroupID(device);
+  @Override
+  public DataSet getStorageGroupSchema() {
+    return regionManager.getStorageGroupSchema();
   }
 
-  /** Build ConfigNodeGroup ConsensusLayer */
-  private void setConsensusLayer() throws IOException {
-    // There is only one ConfigNodeGroup
-    consensusGroupId = new ConsensusGroupId(GroupType.PartitionRegion, 0);
-
-    // If consensusDir does not exist, create consensusDir
-    File consensusDir = new File(conf.getConsensusDir());
-    if (!consensusDir.exists()) {
-      if (consensusDir.mkdirs()) {
-        LOGGER.info("Make consensus dirs: {}", consensusDir);
-      } else {
-        throw new IOException(
-            String.format(
-                "Start ConfigNode failed, because couldn't make system dirs: 
%s.",
-                consensusDir.getAbsolutePath()));
-      }
+  @Override
+  public TSStatus setStorageGroup(PhysicalPlan physicalPlan) {
+    if (physicalPlan instanceof SetStorageGroupPlan) {
+      return regionManager.setStorageGroup((SetStorageGroupPlan) physicalPlan);
     }
+    return ERROR_TSSTATUS;
+  }
 
-    // Implement specific consensus
-    switch (conf.getConsensusType()) {
-      case STANDALONE:
-        constructStandAloneConsensus();
-        break;
-      case RATIS:
-        constructRatisConsensus();
-        break;
-      default:
-        throw new IllegalArgumentException(
-            "Start ConfigNode failed, unrecognized ConsensusType: "
-                + conf.getConsensusType().getTypeName());
+  @Override
+  public DataNodeManager getDataNodeManager() {
+    return dataNodeManager;
+  }
+
+  @Override
+  public DataSet getDataPartition(PhysicalPlan physicalPlan) {
+    if (physicalPlan instanceof DataPartitionPlan) {
+      return partitionManager.getDataPartition((DataPartitionPlan) 
physicalPlan);
     }
+    return new DataNodesInfoDataSet();
   }
 
-  private void constructStandAloneConsensus() throws IOException {
-    // Standalone consensus
-    consensusImpl = new StandAloneConsensus(id -> new 
PartitionRegionStateMachine());
-    consensusImpl.start();
-
-    // Standalone ConsensusGroup
-    consensusImpl.addConsensusGroup(
-        consensusGroupId,
-        Collections.singletonList(
-            new Peer(
-                consensusGroupId, new Endpoint(conf.getRpcAddress(), 
conf.getInternalPort()))));
+  @Override
+  public DataSet getSchemaPartition(PhysicalPlan physicalPlan) {
+    if (physicalPlan instanceof SchemaPartitionPlan) {
+      return partitionManager.getSchemaPartition((SchemaPartitionPlan) 
physicalPlan);
+    }
+    return new DataNodesInfoDataSet();
   }
 
-  private void constructRatisConsensus() throws IOException {
-    // Ratis consensus local implement
-    consensusImpl =
-        RatisConsensus.newBuilder()
-            .setEndpoint(new Endpoint(conf.getRpcAddress(), 
conf.getInternalPort()))
-            .setStateMachineRegistry(id -> new PartitionRegionStateMachine())
-            .setStorageDir(new File(conf.getConsensusDir()))
-            .build();
-    consensusImpl.start();
-
-    // Build ratis group from user properties
-    LOGGER.info(
-        "Set ConfigNode consensus group {}...",
-        Arrays.toString(conf.getConfigNodeGroupAddressList()));
-    List<Peer> peerList = new ArrayList<>();
-    for (Endpoint endpoint : conf.getConfigNodeGroupAddressList()) {
-      peerList.add(new Peer(consensusGroupId, endpoint));
+  @Override
+  public RegionManager getRegionManager() {
+    return regionManager;
+  }
+
+  @Override
+  public DataSet applySchemaPartition(PhysicalPlan physicalPlan) {
+    if (physicalPlan instanceof SchemaPartitionPlan) {
+      return partitionManager.applySchemaPartition((SchemaPartitionPlan) 
physicalPlan);
     }
-    consensusImpl.addConsensusGroup(consensusGroupId, peerList);
+    return new DataNodesInfoDataSet();
   }
 
-  /** Transmit PhysicalPlan to confignode.consensus.statemachine */
-  public ConsensusWriteResponse write(PhysicalPlan plan) {
-    return consensusImpl.write(consensusGroupId, plan);
+  @Override
+  public DataSet applyDataPartition(PhysicalPlan physicalPlan) {
+    if (physicalPlan instanceof DataPartitionPlan) {
+      return partitionManager.applyDataPartition((DataPartitionPlan) 
physicalPlan);
+    }
+    return new DataNodesInfoDataSet();
   }
 
-  /** Transmit PhysicalPlan to confignode.consensus.statemachine */
-  public ConsensusReadResponse read(PhysicalPlan plan) {
-    return consensusImpl.read(consensusGroupId, plan);
+  @Override
+  public DeviceGroupHashInfo getDeviceGroupHashInfo() {
+    return new DeviceGroupHashInfo(
+        conf.getDeviceGroupCount(), conf.getDeviceGroupHashExecutorClass());
   }
 
-  // TODO: Interfaces for LoadBalancer control
+  @Override
+  public ConsensusManager getConsensusManager() {
+    return consensusManager;
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
similarity index 88%
copy from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
copy to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 1fd5767..0b8d4ee 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -45,21 +45,18 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-/**
- * ConfigManager maintains consistency between PartitionTables in the 
ConfigNodeGroup. Expose the
- * query interface for the PartitionTable
- */
-public class ConfigManager {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigManager.class);
+/** ConsensusManager maintains consensus class, request will redirect to 
consensus layer */
+public class ConsensusManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsensusManager.class);
   private static final ConfigNodeConf conf = 
ConfigNodeDescriptor.getInstance().getConf();
 
   private IConsensus consensusImpl;
+
   private ConsensusGroupId consensusGroupId;
 
   private DeviceGroupHashExecutor hashExecutor;
 
-  public ConfigManager() throws IOException {
+  public ConsensusManager() throws IOException {
     setHashExecutor();
     setConsensusLayer();
   }
@@ -93,19 +90,6 @@ public class ConfigManager {
     // There is only one ConfigNodeGroup
     consensusGroupId = new ConsensusGroupId(GroupType.PartitionRegion, 0);
 
-    // If consensusDir does not exist, create consensusDir
-    File consensusDir = new File(conf.getConsensusDir());
-    if (!consensusDir.exists()) {
-      if (consensusDir.mkdirs()) {
-        LOGGER.info("Make consensus dirs: {}", consensusDir);
-      } else {
-        throw new IOException(
-            String.format(
-                "Start ConfigNode failed, because couldn't make system dirs: 
%s.",
-                consensusDir.getAbsolutePath()));
-      }
-    }
-
     // Implement specific consensus
     switch (conf.getConsensusType()) {
       case STANDALONE:
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
new file mode 100644
index 0000000..83f19ef
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
+import org.apache.iotdb.confignode.partition.DataNodeInfo;
+import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
+import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
+import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** Manager server info of data node, add node or remove node */
+public class DataNodeManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataNodeManager.class);
+
+  private DataNodeInfoPersistence dataNodeInfo = 
DataNodeInfoPersistence.getInstance();
+
+  private Manager configManager;
+
+  /** TODO:do some operate after add node or remove node */
+  private List<ChangeServerListener> listeners = new CopyOnWriteArrayList<>();
+
+  private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
+
+  private int nextDataNodeId;
+
+  public DataNodeManager(Manager configManager) {
+    this.configManager = configManager;
+    this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
+  }
+
+  /**
+   * register dta node info when data node start
+   *
+   * @param plan RegisterDataNodePlan
+   * @return success if data node regist first
+   */
+  public TSStatus registerDataNode(RegisterDataNodePlan plan) {
+    TSStatus result;
+    DataNodeInfo info = plan.getInfo();
+    dataNodeInfoReadWriteLock.writeLock().lock();
+    try {
+      if (dataNodeInfo.containsValue(info)) {
+        // TODO: optimize
+        result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+        result.setMessage(String.valueOf(dataNodeInfo.getDataNodeInfo(info)));
+      } else {
+        info.setDataNodeID(nextDataNodeId);
+        ConsensusWriteResponse consensusWriteResponse = 
getConsensusManager().write(plan);
+        nextDataNodeId += 1;
+        return consensusWriteResponse.getStatus();
+      }
+    } finally {
+      dataNodeInfoReadWriteLock.writeLock().unlock();
+    }
+    return result;
+  }
+
+  /**
+   * get dta node info
+   *
+   * @param plan QueryDataNodeInfoPlan
+   * @return all data node info if dataNodeId of plan is -1
+   */
+  public DataNodesInfoDataSet getDataNodeInfo(QueryDataNodeInfoPlan plan) {
+    return (DataNodesInfoDataSet) 
getConsensusManager().read(plan).getDataset();
+  }
+
+  public Set<Integer> getDataNodeId() {
+    return dataNodeInfo.getDataNodeIds();
+  }
+
+  private ConsensusManager getConsensusManager() {
+    return configManager.getConsensusManager();
+  }
+
+  public void registerListener(final ChangeServerListener serverListener) {
+    listeners.add(serverListener);
+  }
+
+  public boolean unregisterListener(final ChangeServerListener serverListener) 
{
+    return listeners.remove(serverListener);
+  }
+
+  /** TODO: wait data node register, wait */
+  public void waitForDataNodes() {
+    listeners.stream().forEach(serverListener -> serverListener.waiting());
+  }
+
+  public Map<Integer, DataNodeInfo> getOnlineDataNodes() {
+    return dataNodeInfo.getOnlineDataNodes();
+  }
+
+  private class ServerStartListenerThread extends Thread implements 
ChangeServerListener {
+    private boolean changed = false;
+
+    ServerStartListenerThread() {
+      setDaemon(true);
+    }
+
+    @Override
+    public void addDataNode(DataNodeInfo DataNodeInfo) {
+      serverChanged();
+    }
+
+    @Override
+    public void removeDataNode(DataNodeInfo dataNodeInfo) {
+      serverChanged();
+    }
+
+    private synchronized void serverChanged() {
+      changed = true;
+      this.notify();
+    }
+
+    @Override
+    public void run() {
+      while (!configManager.isStopped()) {}
+    }
+  }
+
+  /** TODO: For listener for add or remove data node */
+  public interface ChangeServerListener {
+
+    /** Started waiting on DataNode to check */
+    default void waiting() {};
+
+    /**
+     * The server has joined the cluster
+     *
+     * @param dataNodeInfo datanode info
+     */
+    void addDataNode(final DataNodeInfo dataNodeInfo);
+
+    /**
+     * remove data node
+     *
+     * @param dataNodeInfo data node info
+     */
+    void removeDataNode(final DataNodeInfo dataNodeInfo);
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
new file mode 100644
index 0000000..a0228f1
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.rpc.thrift.DeviceGroupHashInfo;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+/**
+ * a subset of services provided by {@ConfigManager}. For use internally only, 
pased to Managers,
+ * services.
+ */
+public interface Manager {
+
+  /**
+   * if a service stop
+   *
+   * @return true if service stopped
+   */
+  public boolean isStopped();
+
+  /**
+   * register data node
+   *
+   * @param physicalPlan physical plan
+   * @return status
+   */
+  public TSStatus registerDataNode(PhysicalPlan physicalPlan);
+
+  /**
+   * get data node info
+   *
+   * @param physicalPlan physical plan
+   * @return data set
+   */
+  DataSet getDataNodeInfo(PhysicalPlan physicalPlan);
+
+  /**
+   * get storage group schema
+   *
+   * @return data set
+   */
+  DataSet getStorageGroupSchema();
+
+  /**
+   * set storage group
+   *
+   * @param physicalPlan physical plan
+   * @return status
+   */
+  TSStatus setStorageGroup(PhysicalPlan physicalPlan);
+
+  /**
+   * get data node info manager
+   *
+   * @return DataNodeInfoManager instance
+   */
+  DataNodeManager getDataNodeManager();
+
+  /**
+   * get data partition
+   *
+   * @param physicalPlan physical plan
+   * @return data set
+   */
+  DataSet getDataPartition(PhysicalPlan physicalPlan);
+
+  /**
+   * get schema partition
+   *
+   * @param physicalPlan physical plan
+   * @return data set
+   */
+  DataSet getSchemaPartition(PhysicalPlan physicalPlan);
+
+  /**
+   * get assign region manager
+   *
+   * @return AssignRegionManager instance
+   */
+  RegionManager getRegionManager();
+
+  /**
+   * apply schema partition
+   *
+   * @param physicalPlan physical plan
+   * @return data set
+   */
+  DataSet applySchemaPartition(PhysicalPlan physicalPlan);
+
+  /**
+   * apply data partition
+   *
+   * @param physicalPlan physical plan
+   * @return data set
+   */
+  DataSet applyDataPartition(PhysicalPlan physicalPlan);
+
+  DeviceGroupHashInfo getDeviceGroupHashInfo();
+
+  ConsensusManager getConsensusManager();
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
new file mode 100644
index 0000000..70dc6a9
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
+import org.apache.iotdb.confignode.partition.DataPartitionInfo;
+import org.apache.iotdb.confignode.partition.SchemaRegionReplicaSet;
+import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
+import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
+import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
+import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** manage data partition and schema partition */
+public class PartitionManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionManager.class);
+
+  /** schema partition read write lock */
+  private final ReentrantReadWriteLock schemaPartitionReadWriteLock;
+
+  /** data partition read write lock */
+  private final ReentrantReadWriteLock dataPartitionReadWriteLock;
+
+  // TODO: Serialize and Deserialize
+  private final DataPartitionInfo dataPartition;
+
+  private final Manager configNodeManager;
+
+  public PartitionManager(Manager configNodeManager) {
+    this.schemaPartitionReadWriteLock = new ReentrantReadWriteLock();
+    this.dataPartitionReadWriteLock = new ReentrantReadWriteLock();
+    this.configNodeManager = configNodeManager;
+    this.dataPartition = new DataPartitionInfo();
+  }
+
+  /**
+   * Get schema partition
+   *
+   * @param physicalPlan storageGroup and deviceGroupIDs
+   * @return Empty Data Set if does not exist
+   */
+  public DataSet getSchemaPartition(SchemaPartitionPlan physicalPlan) {
+    SchemaPartitionDataSet schemaPartitionDataSet;
+    schemaPartitionReadWriteLock.readLock().lock();
+    try {
+      ConsensusReadResponse consensusReadResponse = 
getConsensusManager().read(physicalPlan);
+      schemaPartitionDataSet = (SchemaPartitionDataSet) 
consensusReadResponse.getDataset();
+    } finally {
+      schemaPartitionReadWriteLock.readLock().unlock();
+    }
+    return schemaPartitionDataSet;
+  }
+
+  /**
+   * If does not exist, apply a new schema partition
+   *
+   * @param physicalPlan storage group and device group id
+   * @return Schema Partition data set
+   */
+  public DataSet applySchemaPartition(SchemaPartitionPlan physicalPlan) {
+    String storageGroup = physicalPlan.getStorageGroup();
+    List<Integer> deviceGroupIDs = physicalPlan.getDeviceGroupIDs();
+    List<Integer> noAssignDeviceGroupId =
+        PartitionInfoPersistence.getInstance()
+            .filterSchemaRegionNoAssignDeviceGroupId(storageGroup, 
deviceGroupIDs);
+
+    // allocate partition by storage group and device group id
+    schemaPartitionReadWriteLock.writeLock().lock();
+    try {
+      Map<Integer, SchemaRegionReplicaSet> deviceGroupIdReplicaSets =
+          allocateSchemaPartition(storageGroup, noAssignDeviceGroupId);
+      physicalPlan.setDeviceGroupIdReplicaSet(deviceGroupIdReplicaSets);
+      getConsensusManager().write(physicalPlan);
+      LOGGER.info("Allocate schema partition to {}.", 
deviceGroupIdReplicaSets);
+    } finally {
+      schemaPartitionReadWriteLock.writeLock().unlock();
+    }
+
+    return getSchemaPartition(physicalPlan);
+  }
+
+  /**
+   * TODO: allocate schema partition by balancer
+   *
+   * @param storageGroup storage group
+   * @param deviceGroupIDs device group id list
+   */
+  private Map<Integer, SchemaRegionReplicaSet> allocateSchemaPartition(
+      String storageGroup, List<Integer> deviceGroupIDs) {
+    List<SchemaRegionReplicaSet> schemaRegionEndPoints =
+        RegionInfoPersistence.getInstance().getSchemaRegionEndPoint();
+    Random random = new Random();
+    Map<Integer, SchemaRegionReplicaSet> deviceGroupIdReplicaSets = new 
HashMap<>();
+    for (int i = 0; i < deviceGroupIDs.size(); i++) {
+      SchemaRegionReplicaSet schemaRegionReplicaSet =
+          
schemaRegionEndPoints.get(random.nextInt(schemaRegionEndPoints.size()));
+      deviceGroupIdReplicaSets.put(deviceGroupIDs.get(i), 
schemaRegionReplicaSet);
+    }
+    return deviceGroupIdReplicaSets;
+  }
+
+  private ConsensusManager getConsensusManager() {
+    return configNodeManager.getConsensusManager();
+  }
+
+  /**
+   * TODO:allocate schema partition by balancer
+   *
+   * @param physicalPlan physical plan
+   * @return data set
+   */
+  public DataSet applyDataPartition(DataPartitionPlan physicalPlan) {
+    return null;
+  }
+
+  public DataSet getDataPartition(DataPartitionPlan physicalPlan) {
+    return null;
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
new file mode 100644
index 0000000..5e11594
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.confignode.conf.ConfigNodeConf;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import 
org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
+import org.apache.iotdb.confignode.partition.DataRegionInfo;
+import org.apache.iotdb.confignode.partition.SchemaRegionInfo;
+import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
+import org.apache.iotdb.confignode.physical.sys.QueryStorageGroupSchemaPlan;
+import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** manage data partition and schema partition */
+public class RegionManager {
+  private static final ConfigNodeConf conf = 
ConfigNodeDescriptor.getInstance().getConf();
+  private static final int regionReplicaCount = conf.getRegionReplicaCount();
+  private static final int schemaRegionCount = conf.getSchemaRegionCount();
+  private static final int dataRegionCount = conf.getDataRegionCount();
+
+  /** partition read write lock */
+  private final ReentrantReadWriteLock partitionReadWriteLock;
+
+  // TODO: Serialize and Deserialize
+  private int nextSchemaRegionGroup = 0;
+  // TODO: Serialize and Deserialize
+  private int nextDataRegionGroup = 0;
+
+  private RegionInfoPersistence regionInfoPersistence = 
RegionInfoPersistence.getInstance();
+
+  private final Manager configNodeManager;
+
+  public RegionManager(Manager configNodeManager) {
+    this.partitionReadWriteLock = new ReentrantReadWriteLock();
+    this.configNodeManager = configNodeManager;
+  }
+
+  private ConsensusManager getConsensusManager() {
+    return configNodeManager.getConsensusManager();
+  }
+
+  /**
+   * 1. region allocation 2. add to storage group map
+   *
+   * @param plan SetStorageGroupPlan
+   * @return TSStatusCode.SUCCESS_STATUS if region allocate
+   */
+  public TSStatus setStorageGroup(SetStorageGroupPlan plan) {
+    TSStatus result;
+    partitionReadWriteLock.writeLock().lock();
+    try {
+      if (configNodeManager.getDataNodeManager().getDataNodeId().size() < 
regionReplicaCount) {
+        result = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+        result.setMessage("DataNode is not enough, please register more.");
+      } else {
+        if 
(regionInfoPersistence.containsStorageGroup(plan.getSchema().getName())) {
+          result = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+          result.setMessage(
+              String.format("StorageGroup %s is already set.", 
plan.getSchema().getName()));
+        } else {
+          String storageGroupName = plan.getSchema().getName();
+          StorageGroupSchema storageGroupSchema = new 
StorageGroupSchema(storageGroupName);
+
+          // allocate schema region
+          SchemaRegionInfo schemaRegionInfo = 
schemaRegionAllocation(storageGroupSchema);
+          plan.setSchemaRegionInfo(schemaRegionInfo);
+
+          // allocate data region
+          DataRegionInfo dataRegionInfo = 
dataRegionAllocation(storageGroupSchema);
+          plan.setDataRegionInfo(dataRegionInfo);
+
+          // write consensus
+          result = getConsensusManager().write(plan).getStatus();
+        }
+      }
+    } finally {
+      partitionReadWriteLock.writeLock().unlock();
+    }
+    return result;
+  }
+
+  private DataNodeManager getDataNodeInfoManager() {
+    return configNodeManager.getDataNodeManager();
+  }
+
+  private SchemaRegionInfo schemaRegionAllocation(StorageGroupSchema 
storageGroupSchema) {
+
+    SchemaRegionInfo schemaRegionInfo = new SchemaRegionInfo();
+    // TODO: Use CopySet algorithm to optimize region allocation policy
+    for (int i = 0; i < schemaRegionCount; i++) {
+      List<Integer> dataNodeList = new 
ArrayList<>(getDataNodeInfoManager().getDataNodeId());
+      Collections.shuffle(dataNodeList);
+      schemaRegionInfo.addSchemaRegion(
+          nextSchemaRegionGroup, dataNodeList.subList(0, regionReplicaCount));
+      storageGroupSchema.addSchemaRegionGroup(nextSchemaRegionGroup);
+      nextSchemaRegionGroup += 1;
+    }
+    return schemaRegionInfo;
+  }
+
+  /**
+   * TODO: Only perform in leader node, @rongzhao
+   *
+   * @param storageGroupSchema
+   */
+  private DataRegionInfo dataRegionAllocation(StorageGroupSchema 
storageGroupSchema) {
+    // TODO: Use CopySet algorithm to optimize region allocation policy
+    DataRegionInfo dataRegionInfo = new DataRegionInfo();
+    for (int i = 0; i < dataRegionCount; i++) {
+      List<Integer> dataNodeList = new 
ArrayList<>(getDataNodeInfoManager().getDataNodeId());
+      Collections.shuffle(dataNodeList);
+      dataRegionInfo.createDataRegion(
+          nextDataRegionGroup, dataNodeList.subList(0, regionReplicaCount));
+      storageGroupSchema.addDataRegionGroup(nextDataRegionGroup);
+      nextDataRegionGroup += 1;
+    }
+    return dataRegionInfo;
+  }
+
+  public StorageGroupSchemaDataSet getStorageGroupSchema() {
+
+    ConsensusReadResponse readResponse =
+        getConsensusManager().read(new QueryStorageGroupSchemaPlan());
+    return (StorageGroupSchemaDataSet) readResponse.getDataset();
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataNodeInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataNodeInfo.java
index 52134c6..9e78f16 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataNodeInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataNodeInfo.java
@@ -34,6 +34,8 @@ public class DataNodeInfo {
   public DataNodeInfo(int dataNodeID, Endpoint endPoint) {
     this.dataNodeID = dataNodeID;
     this.endPoint = endPoint;
+    dataRegionGroupIDs = new ArrayList<>();
+    schemaRegionGroupIDs = new ArrayList<>();
   }
 
   public int getDataNodeID() {
@@ -49,9 +51,6 @@ public class DataNodeInfo {
   }
 
   public void addSchemaRegionGroup(int id) {
-    if (schemaRegionGroupIDs == null) {
-      schemaRegionGroupIDs = new ArrayList<>();
-    }
     schemaRegionGroupIDs.add(id);
   }
 
@@ -60,9 +59,6 @@ public class DataNodeInfo {
   }
 
   public void addDataRegionGroup(int id) {
-    if (dataRegionGroupIDs == null) {
-      dataRegionGroupIDs = new ArrayList<>();
-    }
     dataRegionGroupIDs.add(id);
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataPartitionInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataPartitionInfo.java
index 1ccbfa3..69aac80 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataPartitionInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataPartitionInfo.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -18,63 +18,26 @@
  */
 package org.apache.iotdb.confignode.partition;
 
-import java.util.ArrayList;
-import java.util.HashMap;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.commons.partition.TimePartitionId;
+
 import java.util.List;
 import java.util.Map;
 
 public class DataPartitionInfo {
 
-  // TODO: Serialize and Deserialize
-  // Map<StorageGroup, Map<DeviceGroupID, Map<TimeInterval, 
List<DataRegionID>>>>
-  private final Map<String, Map<Integer, Map<Long, List<Integer>>>> 
dataPartitionTable;
-  // TODO: Serialize and Deserialize
-  // Map<DataRegionID, List<DataNodeID>>
-  private final Map<Integer, List<Integer>> dataRegionDataNodesMap;
-
-  // Map<StorageGroup, Map<DeviceGroupID, DataPartitionRule>>
-  private final Map<String, Map<Integer, DataPartitionRule>> 
dataPartitionRuleTable;
-
-  public DataPartitionInfo() {
-    this.dataPartitionTable = new HashMap<>();
-    this.dataRegionDataNodesMap = new HashMap<>();
-
-    this.dataPartitionRuleTable = new HashMap<>();
-  }
-
-  public void createDataPartition(
-      String storageGroup, int deviceGroup, long timeInterval, int 
dataRegionGroup) {
-    if (!dataPartitionTable.containsKey(storageGroup)) {
-      dataPartitionTable.put(storageGroup, new HashMap<>());
-    }
-    if (!dataPartitionTable.get(storageGroup).containsKey(deviceGroup)) {
-      dataPartitionTable.get(storageGroup).put(deviceGroup, new HashMap<>());
-    }
-    if 
(!dataPartitionTable.get(storageGroup).get(deviceGroup).containsKey(timeInterval))
 {
-      dataPartitionTable.get(storageGroup).get(deviceGroup).put(timeInterval, 
new ArrayList<>());
-    }
-    
dataPartitionTable.get(storageGroup).get(deviceGroup).get(timeInterval).add(dataRegionGroup);
-  }
-
-  public List<Integer> getDataPartition(String storageGroup, int deviceGroup, 
long timeInterval) {
-    if (dataPartitionTable.containsKey(storageGroup)) {
-      if (dataPartitionTable.get(storageGroup).containsKey(deviceGroup)) {
-        return 
dataPartitionTable.get(storageGroup).get(deviceGroup).get(timeInterval);
-      }
-    }
-    return null;
-  }
-
-  public void createDataRegion(int dataRegionGroup, List<Integer> 
dataNodeList) {
-    dataRegionDataNodesMap.put(dataRegionGroup, dataNodeList);
-  }
+  // Map<StorageGroup, Map<DeviceGroupID, Map<TimePartitionId, 
List<DataRegionPlaceInfo>>>>
+  private Map<String, Map<Integer, Map<TimePartitionId, 
List<DataRegionReplicaSet>>>>
+      dataPartitionMap;
 
-  public List<Integer> getDataRegionLocation(int dataRegionGroup) {
-    return dataRegionDataNodesMap.get(dataRegionGroup);
+  public Map<String, Map<Integer, Map<TimePartitionId, 
List<DataRegionReplicaSet>>>>
+      getDataPartitionMap() {
+    return dataPartitionMap;
   }
 
-  public void updateDataPartitionRule(
-      String StorageGroup, int deviceGroup, DataPartitionRule rule) {
-    // TODO: Data partition policy by @YongzaoDan
+  public void setDataPartitionMap(
+      Map<String, Map<Integer, Map<TimePartitionId, 
List<DataRegionReplicaSet>>>>
+          dataPartitionMap) {
+    this.dataPartitionMap = dataPartitionMap;
   }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataPartitionInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataRegionInfo.java
similarity index 54%
copy from 
confignode/src/main/java/org/apache/iotdb/confignode/partition/DataPartitionInfo.java
copy to 
confignode/src/main/java/org/apache/iotdb/confignode/partition/DataRegionInfo.java
index 1ccbfa3..9441d14 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataPartitionInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataRegionInfo.java
@@ -18,51 +18,29 @@
  */
 package org.apache.iotdb.confignode.partition;
 
-import java.util.ArrayList;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class DataPartitionInfo {
+public class DataRegionInfo {
 
   // TODO: Serialize and Deserialize
-  // Map<StorageGroup, Map<DeviceGroupID, Map<TimeInterval, 
List<DataRegionID>>>>
-  private final Map<String, Map<Integer, Map<Long, List<Integer>>>> 
dataPartitionTable;
-  // TODO: Serialize and Deserialize
   // Map<DataRegionID, List<DataNodeID>>
-  private final Map<Integer, List<Integer>> dataRegionDataNodesMap;
+  private Map<Integer, List<Integer>> dataRegionDataNodesMap;
 
   // Map<StorageGroup, Map<DeviceGroupID, DataPartitionRule>>
   private final Map<String, Map<Integer, DataPartitionRule>> 
dataPartitionRuleTable;
 
-  public DataPartitionInfo() {
-    this.dataPartitionTable = new HashMap<>();
+  public DataRegionInfo() {
     this.dataRegionDataNodesMap = new HashMap<>();
-
     this.dataPartitionRuleTable = new HashMap<>();
   }
 
-  public void createDataPartition(
-      String storageGroup, int deviceGroup, long timeInterval, int 
dataRegionGroup) {
-    if (!dataPartitionTable.containsKey(storageGroup)) {
-      dataPartitionTable.put(storageGroup, new HashMap<>());
-    }
-    if (!dataPartitionTable.get(storageGroup).containsKey(deviceGroup)) {
-      dataPartitionTable.get(storageGroup).put(deviceGroup, new HashMap<>());
-    }
-    if 
(!dataPartitionTable.get(storageGroup).get(deviceGroup).containsKey(timeInterval))
 {
-      dataPartitionTable.get(storageGroup).get(deviceGroup).put(timeInterval, 
new ArrayList<>());
-    }
-    
dataPartitionTable.get(storageGroup).get(deviceGroup).get(timeInterval).add(dataRegionGroup);
-  }
-
-  public List<Integer> getDataPartition(String storageGroup, int deviceGroup, 
long timeInterval) {
-    if (dataPartitionTable.containsKey(storageGroup)) {
-      if (dataPartitionTable.get(storageGroup).containsKey(deviceGroup)) {
-        return 
dataPartitionTable.get(storageGroup).get(deviceGroup).get(timeInterval);
-      }
-    }
-    return null;
+  public Map<Integer, List<Integer>> getDataRegionDataNodesMap() {
+    return dataRegionDataNodesMap;
   }
 
   public void createDataRegion(int dataRegionGroup, List<Integer> 
dataNodeList) {
@@ -77,4 +55,12 @@ public class DataPartitionInfo {
       String StorageGroup, int deviceGroup, DataPartitionRule rule) {
     // TODO: Data partition policy by @YongzaoDan
   }
+
+  public void serializeImpl(ByteBuffer buffer) {
+    SerializeDeserializeUtil.writeIntMapLists(dataRegionDataNodesMap, buffer);
+  }
+
+  public void deserializeImpl(ByteBuffer buffer) {
+    dataRegionDataNodesMap = SerializeDeserializeUtil.readIntMapLists(buffer);
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/PartitionTable.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/PartitionTable.java
deleted file mode 100644
index bc76b9f..0000000
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/PartitionTable.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.partition;
-
-import org.apache.iotdb.confignode.conf.ConfigNodeConf;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
-import 
org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
-import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
-import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * PartitionTable stores schema partition table, data partition table, 
DataNode information,
- * StorageGroup schema and real-time write load allocation rules. The 
PartitionTable is thread-safe.
- */
-public class PartitionTable {
-
-  private static final ConfigNodeConf conf = 
ConfigNodeDescriptor.getInstance().getConf();
-  private static final int regionReplicaCount = conf.getRegionReplicaCount();
-  private static final int schemaRegionCount = conf.getSchemaRegionCount();
-  private static final int dataRegionCount = conf.getDataRegionCount();
-
-  private final ReentrantReadWriteLock lock;
-  // TODO: Serialize and Deserialize
-  private final Map<String, StorageGroupSchema> storageGroupsMap;
-
-  // TODO: Serialize and Deserialize
-  private int nextDataNode = 0;
-  // TODO: Serialize and Deserialize
-  private int nextSchemaRegionGroup = 0;
-  // TODO: Serialize and Deserialize
-  private int nextDataRegionGroup = 0;
-  // TODO: Serialize and Deserialize
-  private final Map<Integer, DataNodeInfo> dataNodesMap; // Map<DataNodeID, 
DataNodeInfo>
-
-  // TODO: Serialize and Deserialize
-  private final SchemaPartitionInfo schemaPartition;
-
-  // TODO: Serialize and Deserialize
-  private final DataPartitionInfo dataPartition;
-
-  public PartitionTable() {
-    this.lock = new ReentrantReadWriteLock();
-    this.storageGroupsMap = new HashMap<>();
-    this.dataNodesMap = new HashMap<>();
-    this.schemaPartition = new SchemaPartitionInfo();
-    this.dataPartition = new DataPartitionInfo();
-  }
-
-  public TSStatus registerDataNode(RegisterDataNodePlan plan) {
-    TSStatus result;
-    DataNodeInfo info = plan.getInfo();
-    lock.writeLock().lock();
-
-    if (dataNodesMap.containsValue(info)) {
-      // TODO: optimize
-      result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-      for (Map.Entry<Integer, DataNodeInfo> entry : dataNodesMap.entrySet()) {
-        if (entry.getValue().equals(info)) {
-          result.setMessage(String.valueOf(entry.getKey()));
-          break;
-        }
-      }
-    } else {
-      info.setDataNodeID(nextDataNode);
-      dataNodesMap.put(info.getDataNodeID(), info);
-      result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-      result.setMessage(String.valueOf(nextDataNode));
-      nextDataNode += 1;
-    }
-
-    lock.writeLock().unlock();
-    return result;
-  }
-
-  public DataNodesInfoDataSet getDataNodeInfo(QueryDataNodeInfoPlan plan) {
-    DataNodesInfoDataSet result;
-    lock.readLock().lock();
-
-    if (dataNodesMap.size() == 0) {
-      result = null;
-    } else {
-      result = new DataNodesInfoDataSet();
-
-      if (plan.getDataNodeID() == -1) {
-        result.setInfoList(new ArrayList<>(dataNodesMap.values()));
-      } else {
-        if (dataNodesMap.containsKey(plan.getDataNodeID())) {
-          
result.setInfoList(Collections.singletonList(dataNodesMap.get(plan.getDataNodeID())));
-        } else {
-          result = null;
-        }
-      }
-    }
-
-    lock.readLock().unlock();
-    return result;
-  }
-
-  public TSStatus setStorageGroup(SetStorageGroupPlan plan) {
-    TSStatus result;
-    lock.writeLock().lock();
-
-    if (dataNodesMap.size() < regionReplicaCount) {
-      result = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-      result.setMessage("DataNode is not enough, please register more.");
-    } else {
-      if (storageGroupsMap.containsKey(plan.getSchema().getName())) {
-        result = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-        result.setMessage(
-            String.format("StorageGroup %s is already set.", 
plan.getSchema().getName()));
-      } else {
-        StorageGroupSchema schema = new 
StorageGroupSchema(plan.getSchema().getName());
-        regionAllocation(schema);
-        storageGroupsMap.put(schema.getName(), schema);
-        result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-      }
-    }
-
-    lock.writeLock().unlock();
-    return result;
-  }
-
-  private void regionAllocation(StorageGroupSchema schema) {
-    // TODO: Use CopySet algorithm to optimize region allocation policy
-    for (int i = 0; i < schemaRegionCount; i++) {
-      List<Integer> dataNodeList = new ArrayList<>(dataNodesMap.keySet());
-      Collections.shuffle(dataNodeList);
-      for (int j = 0; j < regionReplicaCount; j++) {
-        
dataNodesMap.get(dataNodeList.get(j)).addSchemaRegionGroup(nextSchemaRegionGroup);
-      }
-      schemaPartition.createSchemaRegion(
-          nextSchemaRegionGroup, dataNodeList.subList(0, regionReplicaCount));
-      schema.addSchemaRegionGroup(nextSchemaRegionGroup);
-      nextSchemaRegionGroup += 1;
-    }
-    for (int i = 0; i < dataRegionCount; i++) {
-      List<Integer> dataNodeList = new ArrayList<>(dataNodesMap.keySet());
-      Collections.shuffle(dataNodeList);
-      for (int j = 0; j < regionReplicaCount; j++) {
-        
dataNodesMap.get(dataNodeList.get(j)).addDataRegionGroup(nextDataRegionGroup);
-      }
-      dataPartition.createDataRegion(
-          nextDataRegionGroup, dataNodeList.subList(0, regionReplicaCount));
-      schema.addDataRegionGroup(nextDataRegionGroup);
-      nextDataRegionGroup += 1;
-    }
-  }
-
-  public StorageGroupSchemaDataSet getStorageGroupSchema() {
-    StorageGroupSchemaDataSet result;
-    lock.readLock().lock();
-
-    if (storageGroupsMap.size() == 0) {
-      result = null;
-    } else {
-      result = new StorageGroupSchemaDataSet(new 
ArrayList<>(storageGroupsMap.values()));
-    }
-
-    lock.readLock().unlock();
-    return result;
-  }
-}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaPartitionInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaPartitionInfo.java
index 3595598..d4ae3c6 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaPartitionInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaPartitionInfo.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -21,42 +21,69 @@ package org.apache.iotdb.confignode.partition;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class SchemaPartitionInfo {
 
-  // TODO: Serialize and Deserialize
-  // Map<StorageGroup, Map<DeviceGroupID, SchemaRegionID>>
-  private final Map<String, Map<Integer, Integer>> schemaPartitionTable;
-  // TODO: Serialize and Deserialize
-  // Map<SchemaRegionID, List<DataNodeID>>
-  private final Map<Integer, List<Integer>> schemaRegionDataNodesMap;
+  // Map<StorageGroup, Map<DeviceGroupID, SchemaRegionPlaceInfo>>
+  private Map<String, Map<Integer, SchemaRegionReplicaSet>> 
schemaPartitionInfo;
 
   public SchemaPartitionInfo() {
-    this.schemaPartitionTable = new HashMap<>();
-    this.schemaRegionDataNodesMap = new HashMap<>();
+    schemaPartitionInfo = new HashMap<>();
   }
 
-  public void createSchemaPartition(String storageGroup, int deviceGroup, int 
schemaRegion) {
-    if (!schemaPartitionTable.containsKey(storageGroup)) {
-      schemaPartitionTable.put(storageGroup, new HashMap<>());
-    }
-    schemaPartitionTable.get(storageGroup).put(deviceGroup, schemaRegion);
+  public Map<String, Map<Integer, SchemaRegionReplicaSet>> 
getSchemaPartitionInfo() {
+    return schemaPartitionInfo;
   }
 
-  public Integer getSchemaPartition(String storageGroup, int deviceGroup) {
-    if (schemaPartitionTable.containsKey(storageGroup)) {
-      return schemaPartitionTable.get(storageGroup).get(deviceGroup);
-    }
-    return null;
+  public void setSchemaPartitionInfo(
+      Map<String, Map<Integer, SchemaRegionReplicaSet>> schemaPartitionInfo) {
+    this.schemaPartitionInfo = schemaPartitionInfo;
+  }
+
+  public Map<String, Map<Integer, SchemaRegionReplicaSet>> getSchemaPartition(
+      String storageGroup, List<Integer> deviceGroupIDs) {
+    Map<String, Map<Integer, SchemaRegionReplicaSet>> storageGroupMap = new 
HashMap<>();
+    Map<Integer, SchemaRegionReplicaSet> deviceGroupMap = new HashMap<>();
+    deviceGroupIDs.forEach(
+        deviceGroupID -> {
+          if (schemaPartitionInfo.get(storageGroup) != null
+              && 
schemaPartitionInfo.get(storageGroup).containsKey(deviceGroupID)) {
+            deviceGroupMap.put(
+                deviceGroupID, 
schemaPartitionInfo.get(storageGroup).get(deviceGroupID));
+          }
+        });
+    storageGroupMap.put(storageGroup, deviceGroupMap);
+    return storageGroupMap;
   }
 
-  public void createSchemaRegion(int schemaRegion, List<Integer> dataNode) {
-    if (!schemaRegionDataNodesMap.containsKey(schemaRegion)) {
-      schemaRegionDataNodesMap.put(schemaRegion, dataNode);
+  /**
+   * Filter out unassigned device groups
+   *
+   * @param storageGroup storage group name
+   * @param deviceGroupIDs device group id list
+   * @return deviceGroupIDs does not assigned
+   */
+  public List<Integer> filterNoAssignDeviceGroupId(
+      String storageGroup, List<Integer> deviceGroupIDs) {
+    if (!schemaPartitionInfo.containsKey(storageGroup)) {
+      return deviceGroupIDs;
     }
+    return deviceGroupIDs.stream()
+        .filter(
+            id -> {
+              if 
(schemaPartitionInfo.get(storageGroup).containsKey(deviceGroupIDs)) {
+                return false;
+              }
+              return true;
+            })
+        .collect(Collectors.toList());
   }
 
-  public List<Integer> getSchemaRegionLocation(int schemaRegionGroup) {
-    return schemaRegionDataNodesMap.get(schemaRegionGroup);
+  public void setSchemaRegionReplicaSet(
+      String storageGroup, int deviceGroupId, SchemaRegionReplicaSet 
schemaRegionReplicaSet) {
+    schemaPartitionInfo
+        .computeIfAbsent(storageGroup, value -> new HashMap<>())
+        .put(deviceGroupId, schemaRegionReplicaSet);
   }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaPartitionInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionInfo.java
similarity index 58%
copy from 
confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaPartitionInfo.java
copy to 
confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionInfo.java
index 3595598..9bde9be 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaPartitionInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionInfo.java
@@ -18,39 +18,24 @@
  */
 package org.apache.iotdb.confignode.partition;
 
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class SchemaPartitionInfo {
+public class SchemaRegionInfo {
 
   // TODO: Serialize and Deserialize
-  // Map<StorageGroup, Map<DeviceGroupID, SchemaRegionID>>
-  private final Map<String, Map<Integer, Integer>> schemaPartitionTable;
-  // TODO: Serialize and Deserialize
   // Map<SchemaRegionID, List<DataNodeID>>
-  private final Map<Integer, List<Integer>> schemaRegionDataNodesMap;
+  private Map<Integer, List<Integer>> schemaRegionDataNodesMap;
 
-  public SchemaPartitionInfo() {
-    this.schemaPartitionTable = new HashMap<>();
+  public SchemaRegionInfo() {
     this.schemaRegionDataNodesMap = new HashMap<>();
   }
 
-  public void createSchemaPartition(String storageGroup, int deviceGroup, int 
schemaRegion) {
-    if (!schemaPartitionTable.containsKey(storageGroup)) {
-      schemaPartitionTable.put(storageGroup, new HashMap<>());
-    }
-    schemaPartitionTable.get(storageGroup).put(deviceGroup, schemaRegion);
-  }
-
-  public Integer getSchemaPartition(String storageGroup, int deviceGroup) {
-    if (schemaPartitionTable.containsKey(storageGroup)) {
-      return schemaPartitionTable.get(storageGroup).get(deviceGroup);
-    }
-    return null;
-  }
-
-  public void createSchemaRegion(int schemaRegion, List<Integer> dataNode) {
+  public void addSchemaRegion(int schemaRegion, List<Integer> dataNode) {
     if (!schemaRegionDataNodesMap.containsKey(schemaRegion)) {
       schemaRegionDataNodesMap.put(schemaRegion, dataNode);
     }
@@ -59,4 +44,16 @@ public class SchemaPartitionInfo {
   public List<Integer> getSchemaRegionLocation(int schemaRegionGroup) {
     return schemaRegionDataNodesMap.get(schemaRegionGroup);
   }
+
+  public Map<Integer, List<Integer>> getSchemaRegionDataNodesMap() {
+    return schemaRegionDataNodesMap;
+  }
+
+  public void serializeImpl(ByteBuffer buffer) {
+    SerializeDeserializeUtil.writeIntMapLists(schemaRegionDataNodesMap, 
buffer);
+  }
+
+  public void deserializeImpl(ByteBuffer buffer) {
+    schemaRegionDataNodesMap = 
SerializeDeserializeUtil.readIntMapLists(buffer);
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionReplicaSet.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionReplicaSet.java
new file mode 100644
index 0000000..754af29
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionReplicaSet.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.partition;
+
+import org.apache.iotdb.consensus.common.Endpoint;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SchemaRegionReplicaSet {
+  private int schemaRegionId;
+  private List<Endpoint> endPointList;
+
+  public int getSchemaRegionId() {
+    return schemaRegionId;
+  }
+
+  public void setSchemaRegionId(int schemaRegionId) {
+    this.schemaRegionId = schemaRegionId;
+  }
+
+  public List<Endpoint> getEndPointList() {
+    return endPointList;
+  }
+
+  public void setEndPointList(List<Endpoint> endPointList) {
+    this.endPointList = endPointList;
+  }
+
+  public void serializeImpl(ByteBuffer buffer) {
+    buffer.putInt(schemaRegionId);
+
+    buffer.putInt(endPointList.size());
+    endPointList.forEach(
+        endpoint -> {
+          endpoint.serializeImpl(buffer);
+        });
+  }
+
+  public void deserializeImpl(ByteBuffer buffer) {
+    schemaRegionId = buffer.getInt();
+    int size = buffer.getInt();
+
+    if (endPointList == null) {
+      endPointList = new ArrayList<>();
+    }
+    for (int i = 0; i < size; i++) {
+      Endpoint endpoint = new Endpoint();
+      endpoint.deserializeImpl(buffer);
+      endPointList.add(endpoint);
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("SchemaRegionReplicaSet {");
+    sb.append("schemaRegionId = ").append(schemaRegionId);
+    endPointList.forEach(
+        endpoint -> {
+          sb.append(", EndPoint = ").append(endpoint);
+        });
+    sb.append("}");
+    return sb.toString();
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java
new file mode 100644
index 0000000..0da1ee9
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
+import org.apache.iotdb.confignode.partition.DataNodeInfo;
+import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
+import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class DataNodeInfoPersistence {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataNodeInfoPersistence.class);
+
+  /** online data nodes */
+  private final ConcurrentNavigableMap<Integer, DataNodeInfo> onlineDataNodes =
+      new ConcurrentSkipListMap();
+
+  /** For remove node or draining node */
+  private Set<DataNodeInfo> drainingDataNodes = new HashSet<>();
+
+  private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
+
+  private DataNodeInfoPersistence() {
+    this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
+  }
+
+  public ConcurrentNavigableMap<Integer, DataNodeInfo> getOnlineDataNodes() {
+    return onlineDataNodes;
+  }
+
+  public boolean containsValue(DataNodeInfo info) {
+    return onlineDataNodes.containsValue(info);
+  }
+
+  public void put(int dataNodeID, DataNodeInfo info) {
+    onlineDataNodes.put(dataNodeID, info);
+  }
+
+  public int getDataNodeInfo(DataNodeInfo info) {
+    // TODO: optimize
+    for (Map.Entry<Integer, DataNodeInfo> entry : onlineDataNodes.entrySet()) {
+      if (entry.getValue().equals(info)) {
+        return info.getDataNodeID();
+      }
+    }
+    return -1;
+  }
+
+  /**
+   * register dta node info when data node start
+   *
+   * @param plan RegisterDataNodePlan
+   * @return success if data node regist first
+   */
+  public TSStatus registerDataNode(RegisterDataNodePlan plan) {
+    TSStatus result;
+    DataNodeInfo info = plan.getInfo();
+    dataNodeInfoReadWriteLock.writeLock().lock();
+    try {
+      if (onlineDataNodes.containsValue(info)) {
+        result = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+        result.setMessage(
+            String.format(
+                "DataNode %s is already registered.", 
plan.getInfo().getEndPoint().toString()));
+      } else {
+        onlineDataNodes.put(info.getDataNodeID(), info);
+        result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+        result.setMessage(String.valueOf(info.getDataNodeID()));
+        LOGGER.info("Register data node success, data node is {}", plan);
+      }
+    } finally {
+      dataNodeInfoReadWriteLock.writeLock().unlock();
+    }
+    return result;
+  }
+
+  /**
+   * get dta node info
+   *
+   * @param plan QueryDataNodeInfoPlan
+   * @return all data node info if dataNodeId of plan is -1
+   */
+  public DataNodesInfoDataSet getDataNodeInfo(QueryDataNodeInfoPlan plan) {
+    DataNodesInfoDataSet result = new DataNodesInfoDataSet();
+    int dataNodeId = plan.getDataNodeID();
+    dataNodeInfoReadWriteLock.readLock().lock();
+    try {
+      if (dataNodeId == -1) {
+        result.setInfoList(new ArrayList<>(onlineDataNodes.values()));
+      } else {
+        
result.setInfoList(Collections.singletonList(onlineDataNodes.get(dataNodeId)));
+      }
+    } finally {
+      dataNodeInfoReadWriteLock.readLock().unlock();
+    }
+
+    return result;
+  }
+
+  public Set<Integer> getDataNodeIds() {
+    return onlineDataNodes.keySet();
+  }
+
+  /**
+   * Add schema region group
+   *
+   * @param dataNodeId data node id
+   * @param schemaRegionGroup schema region group
+   */
+  public void addSchemaRegionGroup(int dataNodeId, int schemaRegionGroup) {
+    dataNodeInfoReadWriteLock.writeLock().lock();
+    try {
+      if (onlineDataNodes.containsKey(dataNodeId)) {
+        
onlineDataNodes.get(dataNodeId).addSchemaRegionGroup(schemaRegionGroup);
+      }
+    } finally {
+      dataNodeInfoReadWriteLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Add data region group
+   *
+   * @param dataNodeId data node id
+   * @param dataRegionGroup data region group
+   */
+  public void addDataRegionGroup(int dataNodeId, int dataRegionGroup) {
+    dataNodeInfoReadWriteLock.writeLock().lock();
+    try {
+      if (onlineDataNodes.containsKey(dataNodeId)) {
+        onlineDataNodes.get(dataNodeId).addSchemaRegionGroup(dataRegionGroup);
+      }
+    } finally {
+      dataNodeInfoReadWriteLock.writeLock().unlock();
+    }
+  }
+
+  @TestOnly
+  public void clear() {
+    onlineDataNodes.clear();
+    drainingDataNodes.clear();
+  }
+
+  private static class DataNodeInfoPersistenceHolder {
+
+    private static final DataNodeInfoPersistence INSTANCE = new 
DataNodeInfoPersistence();
+
+    private DataNodeInfoPersistenceHolder() {
+      // empty constructor
+    }
+  }
+
+  public static DataNodeInfoPersistence getInstance() {
+    return DataNodeInfoPersistence.DataNodeInfoPersistenceHolder.INSTANCE;
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
new file mode 100644
index 0000000..578a22d
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
+import org.apache.iotdb.confignode.partition.DataPartitionInfo;
+import org.apache.iotdb.confignode.partition.SchemaPartitionInfo;
+import org.apache.iotdb.confignode.partition.SchemaRegionReplicaSet;
+import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
+import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
+import org.apache.iotdb.consensus.common.DataSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** manage data partition and schema partition */
+public class PartitionInfoPersistence {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionInfoPersistence.class);
+
+  /** schema partition read write lock */
+  private final ReentrantReadWriteLock schemaPartitionReadWriteLock;
+
+  /** data partition read write lock */
+  private final ReentrantReadWriteLock dataPartitionReadWriteLock;
+
+  // TODO: Serialize and Deserialize
+  private final SchemaPartitionInfo schemaPartition;
+
+  // TODO: Serialize and Deserialize
+  private final DataPartitionInfo dataPartition;
+
+  public PartitionInfoPersistence() {
+    this.schemaPartitionReadWriteLock = new ReentrantReadWriteLock();
+    this.dataPartitionReadWriteLock = new ReentrantReadWriteLock();
+    this.schemaPartition = new SchemaPartitionInfo();
+    this.dataPartition = new DataPartitionInfo();
+  }
+
+  /**
+   * Get schema partition
+   *
+   * @param physicalPlan storageGroup and deviceGroupIDs
+   * @return Empty Data Set if does not exist
+   */
+  public DataSet getSchemaPartition(SchemaPartitionPlan physicalPlan) {
+    SchemaPartitionDataSet schemaPartitionDataSet = new 
SchemaPartitionDataSet();
+    schemaPartitionReadWriteLock.readLock().lock();
+    try {
+      String storageGroup = physicalPlan.getStorageGroup();
+      List<Integer> deviceGroupIDs = physicalPlan.getDeviceGroupIDs();
+      SchemaPartitionInfo schemaPartitionInfo = new SchemaPartitionInfo();
+      schemaPartitionInfo.setSchemaPartitionInfo(
+          schemaPartition.getSchemaPartition(storageGroup, deviceGroupIDs));
+      schemaPartitionDataSet.setSchemaPartitionInfo(schemaPartitionInfo);
+    } finally {
+      schemaPartitionReadWriteLock.readLock().unlock();
+    }
+    return schemaPartitionDataSet;
+  }
+
+  /**
+   * If does not exist, apply a new schema partition
+   *
+   * @param physicalPlan storage group and device group id
+   * @return Schema Partition data set
+   */
+  public DataSet applySchemaPartition(SchemaPartitionPlan physicalPlan) {
+    String storageGroup = physicalPlan.getStorageGroup();
+    List<Integer> deviceGroupIDs = physicalPlan.getDeviceGroupIDs();
+    List<Integer> noAssignDeviceGroupId =
+        schemaPartition.filterNoAssignDeviceGroupId(storageGroup, 
deviceGroupIDs);
+
+    // allocate partition by storage group and device group id
+    Map<Integer, SchemaRegionReplicaSet> deviceGroupIdReplicaSets =
+        physicalPlan.getDeviceGroupIdReplicaSets();
+    schemaPartitionReadWriteLock.writeLock().lock();
+    try {
+
+      deviceGroupIdReplicaSets
+          .entrySet()
+          .forEach(
+              entity -> {
+                schemaPartition.setSchemaRegionReplicaSet(
+                    storageGroup, entity.getKey(), entity.getValue());
+              });
+    } finally {
+      schemaPartitionReadWriteLock.writeLock().unlock();
+    }
+
+    return getSchemaPartition(physicalPlan);
+  }
+
+  /**
+   * TODO:allocate schema partition by balancer
+   *
+   * @param physicalPlan physical plan
+   * @return data set
+   */
+  public DataSet applyDataPartition(DataPartitionPlan physicalPlan) {
+    return null;
+  }
+
+  public DataSet getDataPartition(DataPartitionPlan physicalPlan) {
+    return null;
+  }
+
+  public List<Integer> filterSchemaRegionNoAssignDeviceGroupId(
+      String storageGroup, List<Integer> deviceGroupIDs) {
+    return schemaPartition.filterNoAssignDeviceGroupId(storageGroup, 
deviceGroupIDs);
+  }
+
+  @TestOnly
+  public void clear() {
+    if (schemaPartition.getSchemaPartitionInfo() != null) {
+      schemaPartition.getSchemaPartitionInfo().clear();
+    }
+
+    if (dataPartition.getDataPartitionMap() != null) {
+      dataPartition.getDataPartitionMap().clear();
+    }
+  }
+
+  private static class PartitionInfoPersistenceHolder {
+
+    private static final PartitionInfoPersistence INSTANCE = new 
PartitionInfoPersistence();
+
+    private PartitionInfoPersistenceHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PartitionInfoPersistence getInstance() {
+    return PartitionInfoPersistence.PartitionInfoPersistenceHolder.INSTANCE;
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
new file mode 100644
index 0000000..c67fa1d
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import 
org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
+import org.apache.iotdb.confignode.partition.DataRegionInfo;
+import org.apache.iotdb.confignode.partition.SchemaRegionInfo;
+import org.apache.iotdb.confignode.partition.SchemaRegionReplicaSet;
+import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.consensus.common.Endpoint;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** manage data partition and schema partition */
+public class RegionInfoPersistence {
+
+  /** partition read write lock */
+  private final ReentrantReadWriteLock partitionReadWriteLock;
+
+  // TODO: Serialize and Deserialize
+  // storageGroupName -> StorageGroupSchema
+  private final Map<String, StorageGroupSchema> storageGroupsMap;
+
+  // TODO: Serialize and Deserialize
+  private int nextSchemaRegionGroup = 0;
+  // TODO: Serialize and Deserialize
+  private int nextDataRegionGroup = 0;
+
+  // TODO: Serialize and Deserialize
+  private final SchemaRegionInfo schemaRegion;
+
+  // TODO: Serialize and Deserialize
+  private final DataRegionInfo dataRegion;
+
+  public RegionInfoPersistence() {
+    this.partitionReadWriteLock = new ReentrantReadWriteLock();
+    this.storageGroupsMap = new HashMap<>();
+    this.schemaRegion = new SchemaRegionInfo();
+    this.dataRegion = new DataRegionInfo();
+  }
+
+  /**
+   * 1. region allocation 2. add to storage group map
+   *
+   * @param plan SetStorageGroupPlan
+   * @return TSStatusCode.SUCCESS_STATUS if region allocate
+   */
+  public TSStatus setStorageGroup(SetStorageGroupPlan plan) {
+    TSStatus result;
+    partitionReadWriteLock.writeLock().lock();
+    try {
+      if (storageGroupsMap.containsKey(plan.getSchema().getName())) {
+        result = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+        result.setMessage(
+            String.format("StorageGroup %s is already set.", 
plan.getSchema().getName()));
+      } else {
+        StorageGroupSchema schema = new 
StorageGroupSchema(plan.getSchema().getName());
+        storageGroupsMap.put(schema.getName(), schema);
+
+        plan.getSchemaRegionInfo()
+            .getSchemaRegionDataNodesMap()
+            .entrySet()
+            .forEach(
+                entity -> {
+                  schemaRegion.addSchemaRegion(entity.getKey(), 
entity.getValue());
+                  entity
+                      .getValue()
+                      .forEach(
+                          dataNodeId -> {
+                            DataNodeInfoPersistence.getInstance()
+                                .addSchemaRegionGroup(dataNodeId, 
entity.getKey());
+                          });
+                });
+
+        plan.getDataRegionInfo()
+            .getDataRegionDataNodesMap()
+            .entrySet()
+            .forEach(
+                entity -> {
+                  dataRegion.createDataRegion(entity.getKey(), 
entity.getValue());
+                  entity
+                      .getValue()
+                      .forEach(
+                          dataNodeId -> {
+                            DataNodeInfoPersistence.getInstance()
+                                .addDataRegionGroup(dataNodeId, 
entity.getKey());
+                          });
+                });
+
+        result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      }
+    } finally {
+      partitionReadWriteLock.writeLock().unlock();
+    }
+    return result;
+  }
+
+  public StorageGroupSchemaDataSet getStorageGroupSchema() {
+    StorageGroupSchemaDataSet result = new StorageGroupSchemaDataSet();
+    partitionReadWriteLock.readLock().lock();
+    try {
+      result.setSchemaList(new ArrayList<>(storageGroupsMap.values()));
+    } finally {
+      partitionReadWriteLock.readLock().unlock();
+    }
+    return result;
+  }
+
+  /** @return key is schema region id, value is endpoint list */
+  public List<SchemaRegionReplicaSet> getSchemaRegionEndPoint() {
+    List<SchemaRegionReplicaSet> schemaRegionEndPoints = new ArrayList<>();
+
+    schemaRegion
+        .getSchemaRegionDataNodesMap()
+        .entrySet()
+        .forEach(
+            entity -> {
+              SchemaRegionReplicaSet schemaRegionReplicaSet = new 
SchemaRegionReplicaSet();
+              List<Endpoint> endPoints = new ArrayList<>();
+              entity
+                  .getValue()
+                  .forEach(
+                      dataNodeId -> {
+                        if (DataNodeInfoPersistence.getInstance()
+                            .getOnlineDataNodes()
+                            .containsKey(dataNodeId)) {
+                          endPoints.add(
+                              DataNodeInfoPersistence.getInstance()
+                                  .getOnlineDataNodes()
+                                  .get(dataNodeId)
+                                  .getEndPoint());
+                        }
+                      });
+              schemaRegionReplicaSet.setSchemaRegionId(entity.getKey());
+              schemaRegionReplicaSet.setEndPointList(endPoints);
+              schemaRegionEndPoints.add(schemaRegionReplicaSet);
+            });
+    return schemaRegionEndPoints;
+  }
+
+  public boolean containsStorageGroup(String storageName) {
+    return storageGroupsMap.containsKey(storageName);
+  }
+
+  @TestOnly
+  public void clear() {
+    storageGroupsMap.clear();
+    schemaRegion.getSchemaRegionDataNodesMap().clear();
+    dataRegion.getDataRegionDataNodesMap().clear();
+  }
+
+  private static class RegionInfoPersistenceHolder {
+
+    private static final RegionInfoPersistence INSTANCE = new 
RegionInfoPersistence();
+
+    private RegionInfoPersistenceHolder() {
+      // empty constructor
+    }
+  }
+
+  public static RegionInfoPersistence getInstance() {
+    return RegionInfoPersistence.RegionInfoPersistenceHolder.INSTANCE;
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
index 858cc9d..abeccbe 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
@@ -18,9 +18,11 @@
  */
 package org.apache.iotdb.confignode.physical;
 
+import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
 import org.apache.iotdb.confignode.physical.sys.QueryStorageGroupSchemaPlan;
 import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
@@ -92,6 +94,18 @@ public abstract class PhysicalPlan implements 
IConsensusRequest {
         case QueryStorageGroupSchema:
           plan = new QueryStorageGroupSchemaPlan();
           break;
+        case QueryDataPartition:
+          plan = new DataPartitionPlan(PhysicalPlanType.QueryDataPartition);
+          break;
+        case ApplyDataPartition:
+          plan = new DataPartitionPlan(PhysicalPlanType.ApplyDataPartition);
+          break;
+        case QuerySchemaPartition:
+          plan = new 
SchemaPartitionPlan(PhysicalPlanType.QuerySchemaPartition);
+          break;
+        case ApplySchemaPartition:
+          plan = new 
SchemaPartitionPlan(PhysicalPlanType.ApplySchemaPartition);
+          break;
         default:
           throw new IOException("unknown PhysicalPlan type: " + typeNum);
       }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
index 8706e22..d7606b4 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
@@ -24,5 +24,9 @@ public enum PhysicalPlanType {
   SetStorageGroup,
   DeleteStorageGroup,
   QueryStorageGroupSchema,
-  CreateRegion
+  CreateRegion,
+  QueryDataPartition,
+  ApplyDataPartition,
+  QuerySchemaPartition,
+  ApplySchemaPartition
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/DataPartitionPlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/DataPartitionPlan.java
new file mode 100644
index 0000000..556944e
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/DataPartitionPlan.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.physical.sys;
+
+import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * query DataPartition or apply DataPartition by the specific storageGroup and
+ * deviceGroupStartTimeMap.
+ */
+public class DataPartitionPlan extends PhysicalPlan {
+  private String storageGroup;
+  private Map<Integer, List<Integer>> deviceGroupStartTimeMap;
+
+  public DataPartitionPlan(PhysicalPlanType physicalPlanType) {
+    super(physicalPlanType);
+  }
+
+  public DataPartitionPlan(
+      PhysicalPlanType physicalPlanType,
+      String storageGroup,
+      Map<Integer, List<Integer>> deviceGroupStartTimeMap) {
+    this(physicalPlanType);
+    this.storageGroup = storageGroup;
+    this.deviceGroupStartTimeMap = deviceGroupStartTimeMap;
+  }
+
+  public String getStorageGroup() {
+    return storageGroup;
+  }
+
+  public void setStorageGroup(String storageGroup) {
+    this.storageGroup = storageGroup;
+  }
+
+  public Map<Integer, List<Integer>> getDeviceGroupIDs() {
+    return deviceGroupStartTimeMap;
+  }
+
+  public void setDeviceGroupIDs(Map<Integer, List<Integer>> deviceGroupIDs) {
+    this.deviceGroupStartTimeMap = deviceGroupIDs;
+  }
+
+  @Override
+  protected void serializeImpl(ByteBuffer buffer) {
+    buffer.putInt(PhysicalPlanType.QueryDataPartition.ordinal());
+    SerializeDeserializeUtil.write(storageGroup, buffer);
+    SerializeDeserializeUtil.writeIntMapLists(deviceGroupStartTimeMap, buffer);
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) {
+    storageGroup = SerializeDeserializeUtil.readString(buffer);
+    deviceGroupStartTimeMap = SerializeDeserializeUtil.readIntMapLists(buffer);
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SchemaPartitionPlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SchemaPartitionPlan.java
new file mode 100644
index 0000000..adf9c45
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SchemaPartitionPlan.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.physical.sys;
+
+import org.apache.iotdb.confignode.partition.SchemaRegionReplicaSet;
+import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Get DataNodeInfo by the specific DataNode's id. And return all when 
dataNodeID is set to -1. */
+public class SchemaPartitionPlan extends PhysicalPlan {
+  private String storageGroup;
+  private List<Integer> deviceGroupIDs;
+  private Map<Integer, SchemaRegionReplicaSet> deviceGroupIdReplicaSets;
+
+  public SchemaPartitionPlan(PhysicalPlanType physicalPlanType) {
+    super(physicalPlanType);
+  }
+
+  public SchemaPartitionPlan(
+      PhysicalPlanType physicalPlanType, String storageGroup, List<Integer> 
deviceGroupIDs) {
+    this(physicalPlanType);
+    this.storageGroup = storageGroup;
+    this.deviceGroupIDs = deviceGroupIDs;
+  }
+
+  public void setDeviceGroupIdReplicaSet(
+      Map<Integer, SchemaRegionReplicaSet> deviceGroupIdReplicaSets) {
+    this.deviceGroupIdReplicaSets = deviceGroupIdReplicaSets;
+  }
+
+  public Map<Integer, SchemaRegionReplicaSet> getDeviceGroupIdReplicaSets() {
+    return deviceGroupIdReplicaSets;
+  }
+
+  @Override
+  protected void serializeImpl(ByteBuffer buffer) {
+    buffer.putInt(PhysicalPlanType.QueryDataPartition.ordinal());
+    SerializeDeserializeUtil.write(storageGroup, buffer);
+    buffer.putInt(deviceGroupIDs.size());
+    deviceGroupIDs.forEach(id -> SerializeDeserializeUtil.write(id, buffer));
+
+    buffer.putInt(deviceGroupIdReplicaSets.size());
+    for (Map.Entry<Integer, SchemaRegionReplicaSet> entry : 
deviceGroupIdReplicaSets.entrySet()) {
+      buffer.putInt(entry.getKey());
+      entry.getValue().serializeImpl(buffer);
+    }
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) {
+    storageGroup = SerializeDeserializeUtil.readString(buffer);
+    int idSize = SerializeDeserializeUtil.readInt(buffer);
+    for (int i = 0; i < idSize; i++) {
+      deviceGroupIDs.add(SerializeDeserializeUtil.readInt(buffer));
+    }
+
+    if (deviceGroupIdReplicaSets == null) {
+      deviceGroupIdReplicaSets = new HashMap<>();
+    }
+    int size = buffer.getInt();
+
+    for (int i = 0; i < size; i++) {
+      SchemaRegionReplicaSet schemaRegionReplicaSet = new 
SchemaRegionReplicaSet();
+      schemaRegionReplicaSet.deserializeImpl(buffer);
+      deviceGroupIdReplicaSets.put(buffer.getInt(), schemaRegionReplicaSet);
+    }
+  }
+
+  public String getStorageGroup() {
+    return storageGroup;
+  }
+
+  public List<Integer> getDeviceGroupIDs() {
+    return deviceGroupIDs;
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
index 1b8ec41..bf7a7e9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.confignode.physical.sys;
 
+import org.apache.iotdb.confignode.partition.DataRegionInfo;
+import org.apache.iotdb.confignode.partition.SchemaRegionInfo;
 import org.apache.iotdb.confignode.partition.StorageGroupSchema;
 import org.apache.iotdb.confignode.physical.PhysicalPlan;
 import org.apache.iotdb.confignode.physical.PhysicalPlanType;
@@ -28,6 +30,10 @@ public class SetStorageGroupPlan extends PhysicalPlan {
 
   private StorageGroupSchema schema;
 
+  private SchemaRegionInfo schemaRegionInfo;
+
+  private DataRegionInfo dataRegionInfo;
+
   public SetStorageGroupPlan() {
     super(PhysicalPlanType.SetStorageGroup);
     this.schema = new StorageGroupSchema();
@@ -42,14 +48,38 @@ public class SetStorageGroupPlan extends PhysicalPlan {
     return schema;
   }
 
+  public void setSchema(StorageGroupSchema schema) {
+    this.schema = schema;
+  }
+
+  public SchemaRegionInfo getSchemaRegionInfo() {
+    return schemaRegionInfo;
+  }
+
+  public void setSchemaRegionInfo(SchemaRegionInfo schemaRegionInfo) {
+    this.schemaRegionInfo = schemaRegionInfo;
+  }
+
+  public DataRegionInfo getDataRegionInfo() {
+    return dataRegionInfo;
+  }
+
+  public void setDataRegionInfo(DataRegionInfo dataRegionInfo) {
+    this.dataRegionInfo = dataRegionInfo;
+  }
+
   @Override
   protected void serializeImpl(ByteBuffer buffer) {
     buffer.putInt(PhysicalPlanType.SetStorageGroup.ordinal());
     schema.serialize(buffer);
+    schemaRegionInfo.serializeImpl(buffer);
+    dataRegionInfo.serializeImpl(buffer);
   }
 
   @Override
   protected void deserializeImpl(ByteBuffer buffer) {
     schema.deserialize(buffer);
+    schemaRegionInfo.deserializeImpl(buffer);
+    dataRegionInfo.deserializeImpl(buffer);
   }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/balancer/LoadBalancer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/balancer/LoadBalancer.java
index 4cfb8b3..a7e4472 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/balancer/LoadBalancer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/balancer/LoadBalancer.java
@@ -18,24 +18,12 @@
  */
 package org.apache.iotdb.confignode.service.balancer;
 
-import org.apache.iotdb.confignode.partition.PartitionTable;
-
-import java.util.concurrent.locks.Lock;
-
 /**
  * The LoadBalancer at ConfigNodeGroup-Leader is active for cluster dynamic 
load balancing
  * scheduling
  */
 public class LoadBalancer implements Runnable {
 
-  private final Lock partitionTableLock;
-  private final PartitionTable partitionTable;
-
-  public LoadBalancer(Lock partitionTableLock, PartitionTable partitionTable) {
-    this.partitionTableLock = partitionTableLock;
-    this.partitionTable = partitionTable;
-  }
-
   @Override
   public void run() {}
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
index c9c90af..6f4c14c 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
@@ -19,28 +19,46 @@
 package org.apache.iotdb.confignode.service.executor;
 
 import 
org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
-import org.apache.iotdb.confignode.partition.PartitionTable;
+import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
+import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
+import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
 import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
 import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
 public class PlanExecutor {
 
-  private final PartitionTable partitionTable;
+  private final DataNodeInfoPersistence dataNodeInfoPersistence;
+
+  private final RegionInfoPersistence regionInfoPersistence;
+
+  private final PartitionInfoPersistence partitionInfoPersistence;
 
   public PlanExecutor() {
-    this.partitionTable = new PartitionTable();
+    this.dataNodeInfoPersistence = DataNodeInfoPersistence.getInstance();
+    this.regionInfoPersistence = RegionInfoPersistence.getInstance();
+    this.partitionInfoPersistence = PartitionInfoPersistence.getInstance();
   }
 
   public DataSet executorQueryPlan(PhysicalPlan plan) throws 
UnknownPhysicalPlanTypeException {
     switch (plan.getType()) {
       case QueryDataNodeInfo:
-        return partitionTable.getDataNodeInfo((QueryDataNodeInfoPlan) plan);
+        return dataNodeInfoPersistence.getDataNodeInfo((QueryDataNodeInfoPlan) 
plan);
       case QueryStorageGroupSchema:
-        return partitionTable.getStorageGroupSchema();
+        return regionInfoPersistence.getStorageGroupSchema();
+      case QueryDataPartition:
+        return partitionInfoPersistence.getDataPartition((DataPartitionPlan) 
plan);
+      case QuerySchemaPartition:
+        return 
partitionInfoPersistence.getSchemaPartition((SchemaPartitionPlan) plan);
+      case ApplySchemaPartition:
+        return 
partitionInfoPersistence.applySchemaPartition((SchemaPartitionPlan) plan);
+      case ApplyDataPartition:
+        return partitionInfoPersistence.applyDataPartition((DataPartitionPlan) 
plan);
       default:
         throw new UnknownPhysicalPlanTypeException(plan.getType());
     }
@@ -49,9 +67,9 @@ public class PlanExecutor {
   public TSStatus executorNonQueryPlan(PhysicalPlan plan) throws 
UnknownPhysicalPlanTypeException {
     switch (plan.getType()) {
       case RegisterDataNode:
-        return partitionTable.registerDataNode((RegisterDataNodePlan) plan);
+        return dataNodeInfoPersistence.registerDataNode((RegisterDataNodePlan) 
plan);
       case SetStorageGroup:
-        return partitionTable.setStorageGroup((SetStorageGroupPlan) plan);
+        return regionInfoPersistence.setStorageGroup((SetStorageGroupPlan) 
plan);
       default:
         throw new UnknownPhysicalPlanTypeException(plan.getType());
     }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
index 9619301..1d8dc48 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
@@ -19,13 +19,15 @@
 package org.apache.iotdb.confignode.service.thrift.server;
 
 import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
+import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
 import 
org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.partition.DataNodeInfo;
 import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.confignode.physical.PhysicalPlanType;
 import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
-import org.apache.iotdb.confignode.physical.sys.QueryStorageGroupSchemaPlan;
 import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
 import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
 import org.apache.iotdb.confignode.rpc.thrift.DataNodeMessage;
@@ -45,9 +47,8 @@ import 
org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo;
 import org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.SetStorageGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.StorageGroupMessage;
+import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Endpoint;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
@@ -62,7 +63,6 @@ import java.util.Map;
 
 /** ConfigNodeRPCServer exposes the interface that interacts with the DataNode 
*/
 public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigNodeRPCServerProcessor.class);
 
   private final ConfigManager configManager;
@@ -77,17 +77,17 @@ public class ConfigNodeRPCServerProcessor implements 
ConfigIService.Iface {
     RegisterDataNodePlan plan =
         new RegisterDataNodePlan(
             -1, new Endpoint(req.getEndPoint().getIp(), 
req.getEndPoint().getPort()));
-    ConsensusWriteResponse resp = configManager.write(plan);
+    TSStatus status = configManager.registerDataNode(plan);
     DataNodeRegisterResp result = new DataNodeRegisterResp();
-    result.setRegisterResult(resp.getStatus());
-    if (resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      result.setDataNodeID(Integer.parseInt(resp.getStatus().getMessage()));
+    result.setRegisterResult(status);
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      result.setDataNodeID(Integer.parseInt(status.getMessage()));
       LOGGER.info(
           "Register DataNode successful. DataNodeID: {}, {}",
-          resp.getStatus().getMessage(),
+          status.getMessage(),
           req.getEndPoint().toString());
     } else {
-      LOGGER.error("Register DataNode failed. {}", 
resp.getStatus().getMessage());
+      LOGGER.error("Register DataNode failed. {}", status.getMessage());
     }
     return result;
   }
@@ -95,13 +95,13 @@ public class ConfigNodeRPCServerProcessor implements 
ConfigIService.Iface {
   @Override
   public Map<Integer, DataNodeMessage> getDataNodesMessage(int dataNodeID) 
throws TException {
     QueryDataNodeInfoPlan plan = new QueryDataNodeInfoPlan(dataNodeID);
-    ConsensusReadResponse resp = configManager.read(plan);
+    DataSet dataSet = configManager.getDataNodeInfo(plan);
 
-    if (resp.getDataset() == null) {
+    if (dataSet == null) {
       return new HashMap<>();
     } else {
       Map<Integer, DataNodeMessage> result = new HashMap<>();
-      for (DataNodeInfo info : ((DataNodesInfoDataSet) 
resp.getDataset()).getInfoList()) {
+      for (DataNodeInfo info : ((DataNodesInfoDataSet) dataSet).getInfoList()) 
{
         result.put(
             info.getDataNodeID(),
             new DataNodeMessage(
@@ -117,7 +117,8 @@ public class ConfigNodeRPCServerProcessor implements 
ConfigIService.Iface {
     SetStorageGroupPlan plan =
         new SetStorageGroupPlan(
             new 
org.apache.iotdb.confignode.partition.StorageGroupSchema(req.getStorageGroup()));
-    TSStatus resp = configManager.write(plan).getStatus();
+
+    TSStatus resp = configManager.setStorageGroup(plan);
     if (resp.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.info("Set StorageGroup {} successful.", req.getStorageGroup());
     } else {
@@ -133,14 +134,13 @@ public class ConfigNodeRPCServerProcessor implements 
ConfigIService.Iface {
 
   @Override
   public Map<String, StorageGroupMessage> getStorageGroupsMessage() throws 
TException {
-    ConsensusReadResponse resp = configManager.read(new 
QueryStorageGroupSchemaPlan());
+    DataSet dataSet = configManager.getStorageGroupSchema();
 
-    if (resp.getDataset() == null) {
+    if (dataSet == null) {
       return new HashMap<>();
     } else {
       Map<String, StorageGroupMessage> result = new HashMap<>();
-      for (StorageGroupSchema schema :
-          ((StorageGroupSchemaDataSet) resp.getDataset()).getSchemaList()) {
+      for (StorageGroupSchema schema : ((StorageGroupSchemaDataSet) 
dataSet).getSchemaList()) {
         result.put(schema.getName(), new 
StorageGroupMessage(schema.getName()));
       }
       return result;
@@ -149,16 +149,39 @@ public class ConfigNodeRPCServerProcessor implements 
ConfigIService.Iface {
 
   @Override
   public DeviceGroupHashInfo getDeviceGroupHashInfo() throws TException {
+    return configManager.getDeviceGroupHashInfo();
+  }
+
+  @Override
+  public DataPartitionInfo applyDataPartition(GetDataPartitionReq req) throws 
TException {
     return null;
   }
 
   @Override
+  public SchemaPartitionInfo applySchemaPartition(GetSchemaPartitionReq req) 
throws TException {
+    SchemaPartitionPlan applySchemaPartitionPlan =
+        new SchemaPartitionPlan(
+            PhysicalPlanType.ApplySchemaPartition, req.getStorageGroup(), 
req.getDeviceGroupIDs());
+    DataSet dataSet = 
configManager.applySchemaPartition(applySchemaPartitionPlan);
+    ((SchemaPartitionDataSet) dataSet).getSchemaPartitionInfo();
+
+    return SchemaPartitionDataSet.convertRpcSchemaPartition(
+        ((SchemaPartitionDataSet) dataSet).getSchemaPartitionInfo());
+  }
+
+  @Override
   public SchemaPartitionInfo getSchemaPartition(GetSchemaPartitionReq req) 
throws TException {
-    return null;
+    SchemaPartitionPlan querySchemaPartitionPlan =
+        new SchemaPartitionPlan(
+            PhysicalPlanType.QuerySchemaPartition, req.getStorageGroup(), 
req.getDeviceGroupIDs());
+    DataSet dataSet = 
configManager.getSchemaPartition(querySchemaPartitionPlan);
+    return SchemaPartitionDataSet.convertRpcSchemaPartition(
+        ((SchemaPartitionDataSet) dataSet).getSchemaPartitionInfo());
   }
 
   @Override
   public DataPartitionInfo getDataPartition(GetDataPartitionReq req) throws 
TException {
+
     return null;
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/util/SerializeDeserializeUtil.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/util/SerializeDeserializeUtil.java
new file mode 100644
index 0000000..7461827
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/util/SerializeDeserializeUtil.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.util;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SerializeDeserializeUtil {
+  public static final int INT_LEN = 4;
+
+  private SerializeDeserializeUtil() {}
+
+  /** read string from byteBuffer. */
+  public static String readString(ByteBuffer buffer) {
+    int strLength = readInt(buffer);
+    if (strLength < 0) {
+      return null;
+    } else if (strLength == 0) {
+      return "";
+    }
+    byte[] bytes = new byte[strLength];
+    buffer.get(bytes, 0, strLength);
+    return new String(bytes, 0, strLength);
+  }
+
+  /** read a int var from byteBuffer. */
+  public static int readInt(ByteBuffer buffer) {
+    return buffer.getInt();
+  }
+
+  /**
+   * write string to byteBuffer.
+   *
+   * @return the length of string represented by byte[].
+   */
+  public static int write(String s, ByteBuffer buffer) {
+    if (s == null) {
+      return write(-1, buffer);
+    }
+    int len = 0;
+    byte[] bytes = s.getBytes();
+    len += write(bytes.length, buffer);
+    buffer.put(bytes);
+    len += bytes.length;
+    return len;
+  }
+
+  /**
+   * write a int n to byteBuffer.
+   *
+   * @return The number of bytes used to represent n.
+   */
+  public static int write(int n, ByteBuffer buffer) {
+    buffer.putInt(n);
+    return INT_LEN;
+  }
+
+  /**
+   * write a map to buffer
+   *
+   * @param map Map<String, String>
+   * @param buffer
+   * @return length
+   */
+  public static int write(Map<String, String> map, ByteBuffer buffer) {
+    if (map == null) {
+      return write(-1, buffer);
+    }
+
+    int length = 0;
+    byte[] bytes;
+    buffer.putInt(map.size());
+    length += 4;
+    for (Map.Entry<String, String> entry : map.entrySet()) {
+      bytes = entry.getKey().getBytes();
+      buffer.putInt(bytes.length);
+      length += 4;
+      buffer.put(bytes);
+      length += bytes.length;
+      bytes = entry.getValue().getBytes();
+      buffer.putInt(bytes.length);
+      length += 4;
+      buffer.put(bytes);
+      length += bytes.length;
+    }
+    return length;
+  }
+
+  /**
+   * read map from buffer
+   *
+   * @param buffer ByteBuffer
+   * @return Map<String, String>
+   */
+  public static Map<String, String> readMap(ByteBuffer buffer) {
+    int length = readInt(buffer);
+    if (length == -1) {
+      return null;
+    }
+    Map<String, String> map = new HashMap<>(length);
+    for (int i = 0; i < length; i++) {
+      // key
+      String key = readString(buffer);
+      // value
+      String value = readString(buffer);
+      map.put(key, value);
+    }
+    return map;
+  }
+
+  /**
+   * write a string map to buffer
+   *
+   * @param map Map<String, String>
+   * @param buffer
+   * @return length
+   */
+  public static int writeStringMapLists(Map<String, List<String>> map, 
ByteBuffer buffer) {
+    if (map == null) {
+      return write(-1, buffer);
+    }
+
+    int length = 0;
+    byte[] bytes;
+    buffer.putInt(map.size());
+    for (Map.Entry<String, List<String>> entry : map.entrySet()) {
+      bytes = entry.getKey().getBytes();
+      buffer.putInt(bytes.length);
+      buffer.put(bytes);
+      buffer.putInt(entry.getValue().size());
+      entry
+          .getValue()
+          .forEach(
+              b -> {
+                buffer.putInt(b.length());
+                buffer.put(b.getBytes());
+              });
+    }
+    return length;
+  }
+
+  /**
+   * read string map from buffer
+   *
+   * @param buffer ByteBuffer
+   * @return Map<String, String>
+   */
+  public static Map<String, List<String>> readStringMapLists(ByteBuffer 
buffer) {
+    int length = readInt(buffer);
+    if (length == -1) {
+      return null;
+    }
+    Map<String, List<String>> map = new HashMap<>(length);
+    for (int i = 0; i < length; i++) {
+      // key
+      String key = readString(buffer);
+      // value
+      int listSize = readInt(buffer);
+      List<String> valueList = new ArrayList<>();
+      for (int j = 0; j < listSize; j++) {
+        String value = readString(buffer);
+        valueList.add(value);
+      }
+      map.put(key, valueList);
+    }
+    return map;
+  }
+
+  /**
+   * write a string map to buffer
+   *
+   * @param map Map<String, String>
+   * @param buffer
+   * @return length
+   */
+  public static int writeIntMapLists(Map<Integer, List<Integer>> map, 
ByteBuffer buffer) {
+    if (map == null) {
+      return write(-1, buffer);
+    }
+
+    int length = 0;
+    buffer.putInt(map.size());
+    for (Map.Entry<Integer, List<Integer>> entry : map.entrySet()) {
+      buffer.putInt(entry.getKey());
+      buffer.putInt(entry.getValue().size());
+      entry
+          .getValue()
+          .forEach(
+              b -> {
+                buffer.putInt(b);
+              });
+    }
+    return length;
+  }
+
+  /**
+   * read string map from buffer
+   *
+   * @param buffer ByteBuffer
+   * @return Map<String, String>
+   */
+  public static Map<Integer, List<Integer>> readIntMapLists(ByteBuffer buffer) 
{
+    int length = readInt(buffer);
+    if (length == -1) {
+      return null;
+    }
+    Map<Integer, List<Integer>> map = new HashMap<>(length);
+    for (int i = 0; i < length; i++) {
+      // key
+      int key = readInt(buffer);
+      // value
+      int listSize = readInt(buffer);
+      List<Integer> valueList = new ArrayList<>();
+      for (int j = 0; j < listSize; j++) {
+        int value = readInt(buffer);
+        valueList.add(value);
+      }
+      map.put(key, valueList);
+    }
+    return map;
+  }
+}
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
index f99daa9..d9f9cbb 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.confignode.manager.hash;
 
-import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.ConsensusManager;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -60,7 +60,7 @@ public class DeviceGroupHashExecutorManualTest {
   }
 
   public void GeneralIndexTest() throws IOException {
-    ConfigManager manager = new ConfigManager();
+    ConsensusManager manager = new ConsensusManager();
     int[] bucket = new int[deviceGroupCount];
     Arrays.fill(bucket, 0);
 
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
index 23fd30d..cf96e33 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
@@ -18,9 +18,16 @@
  */
 package org.apache.iotdb.confignode.service.thrift.server;
 
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
+import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
+import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
 import org.apache.iotdb.confignode.rpc.thrift.DataNodeMessage;
 import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.DeviceGroupHashInfo;
+import org.apache.iotdb.confignode.rpc.thrift.GetSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo;
 import org.apache.iotdb.confignode.rpc.thrift.SetStorageGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.StorageGroupMessage;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -28,7 +35,9 @@ import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
 import org.apache.thrift.TException;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -39,6 +48,16 @@ import java.util.Map;
 
 public class ConfigNodeRPCServerProcessorTest {
 
+  @Before
+  public void before() {}
+
+  @After
+  public void after() {
+    DataNodeInfoPersistence.getInstance().clear();
+    PartitionInfoPersistence.getInstance().clear();
+    RegionInfoPersistence.getInstance().clear();
+  }
+
   @Test
   public void registerDataNodeTest() throws TException, IOException {
     ConfigNodeRPCServerProcessor processor = new 
ConfigNodeRPCServerProcessor();
@@ -114,4 +133,136 @@ public class ConfigNodeRPCServerProcessorTest {
     Assert.assertNotNull(messageMap.get(sg));
     Assert.assertEquals(sg, messageMap.get(sg).getStorageGroup());
   }
+
+  @Test
+  public void getDeviceGroupHashInfoTest() throws TException, IOException {
+    ConfigNodeRPCServerProcessor processor = new 
ConfigNodeRPCServerProcessor();
+    // get Device Group hash
+    DeviceGroupHashInfo deviceGroupHashInfo = new DeviceGroupHashInfo();
+    deviceGroupHashInfo = processor.getDeviceGroupHashInfo();
+    Assert.assertEquals(
+        deviceGroupHashInfo.getDeviceGroupCount(),
+        ConfigNodeDescriptor.getInstance().getConf().getDeviceGroupCount());
+    Assert.assertEquals(
+        deviceGroupHashInfo.getHashClass(),
+        
ConfigNodeDescriptor.getInstance().getConf().getDeviceGroupHashExecutorClass());
+  }
+
+  @Test
+  public void applySchemaPartitionTest() throws TException, IOException {
+    ConfigNodeRPCServerProcessor processor = new 
ConfigNodeRPCServerProcessor();
+
+    TSStatus status;
+    final String sg = "root.sg0";
+
+    // failed because there are not enough DataNodes
+    SetStorageGroupReq setReq = new SetStorageGroupReq(sg);
+    status = processor.setStorageGroup(setReq);
+    Assert.assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), 
status.getCode());
+    Assert.assertEquals("DataNode is not enough, please register more.", 
status.getMessage());
+
+    // register DataNodes
+    DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new 
EndPoint("0.0.0.0", 6667));
+    DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new 
EndPoint("0.0.0.0", 6668));
+    DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new 
EndPoint("0.0.0.0", 6669));
+    status = processor.registerDataNode(registerReq0).getRegisterResult();
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+    status = processor.registerDataNode(registerReq1).getRegisterResult();
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+    status = processor.registerDataNode(registerReq2).getRegisterResult();
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+    // set StorageGroup
+    status = processor.setStorageGroup(setReq);
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+    // applySchemaPartition
+    GetSchemaPartitionReq getSchemaPartitionReq = new GetSchemaPartitionReq();
+    List<Integer> deviceGroupIds = new ArrayList<>();
+    Integer deviceGroupId = 1000;
+    deviceGroupIds.add(deviceGroupId);
+    
getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+    SchemaPartitionInfo schemaPartitionInfo = 
processor.applySchemaPartition(getSchemaPartitionReq);
+    Assert.assertTrue(schemaPartitionInfo != null);
+    
Assert.assertTrue(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg) != 
null);
+    schemaPartitionInfo
+        .getSchemaRegionDataNodesMap()
+        .get(sg)
+        .entrySet()
+        .forEach(
+            entity -> {
+              Assert.assertEquals(deviceGroupId, entity.getKey());
+            });
+  }
+
+  @Test
+  public void getSchemaPartitionTest() throws TException, IOException {
+    ConfigNodeRPCServerProcessor processor = new 
ConfigNodeRPCServerProcessor();
+
+    TSStatus status;
+    final String sg = "root.sg0";
+
+    // failed because there are not enough DataNodes
+    SetStorageGroupReq setReq = new SetStorageGroupReq(sg);
+    status = processor.setStorageGroup(setReq);
+    Assert.assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), 
status.getCode());
+    Assert.assertEquals("DataNode is not enough, please register more.", 
status.getMessage());
+
+    // register DataNodes
+    DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new 
EndPoint("0.0.0.0", 6667));
+    DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new 
EndPoint("0.0.0.0", 6668));
+    DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new 
EndPoint("0.0.0.0", 6669));
+    status = processor.registerDataNode(registerReq0).getRegisterResult();
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+    status = processor.registerDataNode(registerReq1).getRegisterResult();
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+    status = processor.registerDataNode(registerReq2).getRegisterResult();
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+    // set StorageGroup
+    status = processor.setStorageGroup(setReq);
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+    // getSchemaPartition
+    GetSchemaPartitionReq getSchemaPartitionReq = new GetSchemaPartitionReq();
+    List<Integer> deviceGroupIds = new ArrayList<>();
+    Integer deviceGroupId = 1000;
+    deviceGroupIds.add(deviceGroupId);
+    
getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+    SchemaPartitionInfo schemaPartitionInfo = 
processor.getSchemaPartition(getSchemaPartitionReq);
+    Assert.assertTrue(schemaPartitionInfo != null);
+    
Assert.assertTrue(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg) != 
null);
+
+    // because does not apply schema partition, so schema partition is null
+    Assert.assertEquals(
+        null, 
schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg).get(deviceGroupId));
+
+    // applySchemaPartition
+    deviceGroupIds.add(deviceGroupId);
+    
getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+    schemaPartitionInfo = 
processor.applySchemaPartition(getSchemaPartitionReq);
+    Assert.assertTrue(schemaPartitionInfo != null);
+    
Assert.assertTrue(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg) != 
null);
+    schemaPartitionInfo
+        .getSchemaRegionDataNodesMap()
+        .get(sg)
+        .entrySet()
+        .forEach(
+            entity -> {
+              Assert.assertEquals(deviceGroupId, entity.getKey());
+            });
+
+    // getSchemaPartition twice
+    getSchemaPartitionReq = new GetSchemaPartitionReq();
+    deviceGroupIds = new ArrayList<>();
+    deviceGroupIds.add(deviceGroupId);
+    
getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+    schemaPartitionInfo = processor.getSchemaPartition(getSchemaPartitionReq);
+    Assert.assertTrue(schemaPartitionInfo != null);
+    
Assert.assertTrue(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg) != 
null);
+
+    // because apply schema partition, so schema partition is not null
+    Assert.assertTrue(
+        
schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg).get(deviceGroupId) != 
null);
+  }
 }
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/utils/SerializeDeserializeUtilTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/utils/SerializeDeserializeUtilTest.java
new file mode 100644
index 0000000..26b2d75
--- /dev/null
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/utils/SerializeDeserializeUtilTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.utils;
+
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SerializeDeserializeUtilTest {
+  protected static final int DEFAULT_BUFFER_SIZE = 4096;
+
+  @Test
+  public void readWriteStringMapFromBufferTest() throws IOException {
+    // 1. read write map<String, String>
+    String key = "string";
+    String value = "string";
+    Map<String, String> map = new HashMap<>();
+    map.put(key, value);
+
+    ByteBuffer bf = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+    SerializeDeserializeUtil.write(map, bf);
+    byte[] b = bf.array();
+    bf.clear();
+    Map<String, String> result = 
SerializeDeserializeUtil.readMap(ByteBuffer.wrap(b));
+    Assert.assertNotNull(result);
+    Assert.assertEquals(map, result);
+  }
+
+  @Test
+  public void readWriteStringMapListFromBufferTest() throws IOException {
+    String key = "string";
+    String value = "string";
+    // 2. read write map<String, List<String>>
+    Map<String, List<String>> stringMapLists = new HashMap<>();
+    List<String> keyLists = new ArrayList<>();
+    keyLists.add(value);
+    stringMapLists.put(key, keyLists);
+
+    ByteBuffer bf = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+    SerializeDeserializeUtil.writeStringMapLists(stringMapLists, bf);
+    byte[] b = bf.array();
+    bf.clear();
+    Map<String, List<String>> stringMapListsResult =
+        SerializeDeserializeUtil.readStringMapLists(ByteBuffer.wrap(b));
+    Assert.assertNotNull(stringMapListsResult);
+    Assert.assertEquals(stringMapLists, stringMapListsResult);
+  }
+
+  @Test
+  public void readWriteIntMapListFromBufferTest() throws IOException {
+    // 3. read write map<Integer, List<Integer>>
+    Map<Integer, List<Integer>> integerMapLists = new HashMap<>();
+    List<Integer> intLists = new ArrayList<>();
+    intLists.add(1);
+    integerMapLists.put(1, intLists);
+
+    ByteBuffer bf = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+    byte[] b = bf.array();
+
+    SerializeDeserializeUtil.writeIntMapLists(integerMapLists, bf);
+    Map<Integer, List<Integer>> intMapListsResult =
+        SerializeDeserializeUtil.readIntMapLists(ByteBuffer.wrap(b));
+    Assert.assertNotNull(intMapListsResult);
+    Assert.assertEquals(integerMapLists, intMapListsResult);
+  }
+}
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/common/Endpoint.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/Endpoint.java
index b01b7e4..6e3951b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Endpoint.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Endpoint.java
@@ -19,13 +19,16 @@
 
 package org.apache.iotdb.consensus.common;
 
+import java.nio.ByteBuffer;
 import java.util.Objects;
 
 // TODO Use a mature IDL framework such as Protobuf to manage this structure
 public class Endpoint {
 
-  private final String ip;
-  private final int port;
+  private String ip;
+  private int port;
+
+  public Endpoint() {}
 
   public Endpoint(String ip, int port) {
     this.ip = ip;
@@ -40,6 +43,23 @@ public class Endpoint {
     return port;
   }
 
+  public void serializeImpl(ByteBuffer buffer) {
+    byte[] bytes = ip.getBytes();
+    buffer.putInt(bytes.length);
+    buffer.put(bytes);
+
+    buffer.putInt(port);
+  }
+
+  public void deserializeImpl(ByteBuffer buffer) {
+    int length = buffer.getInt();
+    byte[] bytes = new byte[length];
+    buffer.get(bytes, 0, length);
+    ip = new String(bytes, 0, length);
+
+    port = buffer.getInt();
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionInfo.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionInfo.java
index 119115c..bb629f0 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionInfo.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionInfo.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.commons.partition;
 
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class SchemaPartitionInfo {
@@ -33,4 +35,19 @@ public class SchemaPartitionInfo {
       Map<String, Map<DeviceGroupId, SchemaRegionReplicaSet>> 
schemaPartitionInfo) {
     this.schemaPartitionInfo = schemaPartitionInfo;
   }
+
+  public Map<String, Map<Integer, SchemaRegionReplicaSet>> getSchemaPartition(
+      String storageGroup, List<Integer> deviceGroupIDs) {
+    Map<String, Map<Integer, SchemaRegionReplicaSet>> storageGroupMap = new 
HashMap<>();
+    Map<Integer, SchemaRegionReplicaSet> deviceGroupMap = new HashMap<>();
+    deviceGroupIDs.forEach(
+        deviceGroupID -> {
+          if (schemaPartitionInfo.get(storageGroup).containsKey(new 
DeviceGroupId(deviceGroupID))) {
+            deviceGroupMap.put(
+                deviceGroupID, 
schemaPartitionInfo.get(storageGroup).get(deviceGroupID));
+          }
+        });
+    storageGroupMap.put(storageGroup, deviceGroupMap);
+    return storageGroupMap;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
index 476bc16..d1fade4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
@@ -33,7 +33,7 @@ public interface IPartitionFetcher {
 
   SchemaPartitionInfo fetchSchemaPartitionInfo(String devicePath);
 
-  SchemaPartitionInfo fetchSchemaPartitionInfos(List<String> devicePath);
+  SchemaPartitionInfo fetchSchemaPartitionInfos(List<String> devicePaths);
 
   PartitionInfo fetchPartitionInfo(DataPartitionQueryParam parameter);
 
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift 
b/thrift-confignode/src/main/thrift/confignode.thrift
index d58b449..2e3fccb 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -56,9 +56,17 @@ struct GetSchemaPartitionReq {
     2: required list<i32> deviceGroupIDs
 }
 
+struct RegionReplicaSet {
+    1: required i32 regionId
+    2: required list<rpc.EndPoint> endpoint
+}
+
 struct SchemaPartitionInfo {
-    1: required map<i32, i32> deviceGroupSchemaRegionGroupMap
-    2: required map<i32, list<i32>> SchemaRegionGroupDataNodeMap
+    1: required map<string, map<i32, RegionReplicaSet>> 
schemaRegionDataNodesMap
+}
+
+struct DataPartitionInfo {
+    1: required map<string, map<i64, map<i32, list<RegionReplicaSet>>>> 
deviceGroupStartTimeDataRegionGroupMap
 }
 
 struct GetDataPartitionReq {
@@ -66,11 +74,6 @@ struct GetDataPartitionReq {
     2: required map<i32, list<i64>> deviceGroupStartTimeMap
 }
 
-struct DataPartitionInfo {
-    1: required map<i32, map<i64, list<i32>>> 
deviceGroupStartTimeDataRegionGroupMap
-    2: required map<i32, list<i32>> dataRegionGroupDataNodeMap
-}
-
 struct DeviceGroupHashInfo {
     1: required i32 deviceGroupCount
     2: required string hashClass
@@ -131,6 +134,12 @@ service ConfigIService {
 
   DeviceGroupHashInfo getDeviceGroupHashInfo()
 
+  // apply data partition when write data
+  DataPartitionInfo applyDataPartition(GetDataPartitionReq req)
+
+  // apply schema partition when create schema
+  SchemaPartitionInfo applySchemaPartition(GetSchemaPartitionReq req)
+
   DataPartitionInfoResp fetchDataPartitionInfo(FetchDataPartitionReq req)
 
   SchemaPartitionInfoResp fetchSchemaPartitionInfo(FetchSchemaPartitionReq req)

Reply via email to