This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a30316055 [IOTDB-4062] Add schema write check above consensus layer 
(#7472)
2a30316055 is described below

commit 2a30316055d76442463fa7dbcded1eeaf13b28c1
Author: Marcos_Zyk <[email protected]>
AuthorDate: Fri Sep 30 15:15:10 2022 +0800

    [IOTDB-4062] Add schema write check above consensus layer (#7472)
---
 .../schemaregion/rocksdb/RSchemaRegion.java        |   6 +
 .../iotdb/db/exception/sql/SemanticException.java  |   4 +
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  |  48 ++-
 .../db/metadata/schemaregion/ISchemaRegion.java    |   3 +
 .../schemaregion/SchemaRegionMemoryImpl.java       |   6 +
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |   6 +
 .../execution/executor/RegionExecutionResult.java} |  36 +-
 .../mpp/execution/executor/RegionReadExecutor.java |  78 ++++
 .../execution/executor/RegionWriteExecutor.java    | 470 +++++++++++++++++++++
 .../iotdb/db/mpp/plan/analyze/SchemaValidator.java |   2 +-
 .../plan/node/metedata/write/MeasurementGroup.java |  27 ++
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 106 +----
 .../java/org/apache/iotdb/db/service/DataNode.java |   4 +
 .../impl/DataNodeInternalRPCServiceImpl.java       |  91 ++--
 .../service/thrift/impl/DataNodeRegionManager.java | 148 +------
 .../DataNodeInternalRPCServiceImplTest.java        |   3 +
 16 files changed, 773 insertions(+), 265 deletions(-)

diff --git 
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
 
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 1a5ba6715e..153132a0a7 100644
--- 
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ 
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -532,6 +532,12 @@ public class RSchemaRegion implements ISchemaRegion {
     }
   }
 
+  @Override
+  public Map<Integer, MetadataException> checkMeasurementExistence(
+      PartialPath devicePath, List<String> measurementList, List<String> 
aliasList) {
+    throw new UnsupportedOperationException();
+  }
+
   private void createEntityRecursively(String[] nodes, int start, int end, 
boolean aligned)
       throws RocksDBException, MetadataException, InterruptedException {
     if (start <= end) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java 
b/server/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
index 7408a4c322..2e17d0775d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
@@ -24,4 +24,8 @@ public class SemanticException extends RuntimeException {
   public SemanticException(String message) {
     super(message);
   }
+
+  public SemanticException(Throwable cause) {
+    super(cause);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index f316960b2d..3a05d885c4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -71,8 +71,10 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Deque;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -383,6 +385,50 @@ public class MTreeBelowSGMemoryImpl implements 
IMTreeBelowSG {
     return new Pair<>(cur, upperTemplate);
   }
 
+  public Map<Integer, MetadataException> checkMeasurementExistence(
+      PartialPath devicePath, List<String> measurementList, List<String> 
aliasList) {
+    IMNode device = null;
+    try {
+      device = getNodeByPath(devicePath);
+    } catch (PathNotExistException e) {
+      return Collections.emptyMap();
+    }
+
+    if (!device.isEntity()) {
+      return Collections.emptyMap();
+    }
+    Map<Integer, MetadataException> failingMeasurementMap = new HashMap<>();
+    for (int i = 0; i < measurementList.size(); i++) {
+      if (device.hasChild(measurementList.get(i))) {
+        IMNode node = device.getChild(measurementList.get(i));
+        if (node.isMeasurement()) {
+          if (node.getAsMeasurementMNode().isPreDeleted()) {
+            failingMeasurementMap.put(
+                i,
+                new 
MeasurementInBlackListException(devicePath.concatNode(measurementList.get(i))));
+          } else {
+            failingMeasurementMap.put(
+                i,
+                new MeasurementAlreadyExistException(
+                    devicePath.getFullPath() + "." + measurementList.get(i),
+                    node.getAsMeasurementMNode().getMeasurementPath()));
+          }
+        } else {
+          failingMeasurementMap.put(
+              i,
+              new PathAlreadyExistException(
+                  devicePath.getFullPath() + "." + measurementList.get(i)));
+        }
+      }
+      if (aliasList != null && aliasList.get(i) != null && 
device.hasChild(aliasList.get(i))) {
+        failingMeasurementMap.put(
+            i,
+            new AliasAlreadyExistException(
+                devicePath.getFullPath() + "." + measurementList.get(i), 
aliasList.get(i)));
+      }
+    }
+    return failingMeasurementMap;
+  }
   /**
    * Delete path. The path should be a full path from root to leaf node
    *
@@ -1021,7 +1067,7 @@ public class MTreeBelowSGMemoryImpl implements 
IMTreeBelowSG {
    * @return last node in given seriesPath
    */
   @Override
-  public IMNode getNodeByPath(PartialPath path) throws MetadataException {
+  public IMNode getNodeByPath(PartialPath path) throws PathNotExistException {
     String[] nodes = path.getNodes();
     IMNode cur = storageGroupMNode;
     IMNode next;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index 3a146ad3e3..64ced4db0e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -110,6 +110,9 @@ public interface ISchemaRegion {
 
   void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws 
MetadataException;
 
+  Map<Integer, MetadataException> checkMeasurementExistence(
+      PartialPath devicePath, List<String> measurementList, List<String> 
aliasList);
+
   /**
    * Delete all timeseries matching the given path pattern. If using prefix 
match, the path pattern
    * is used to match prefix path. All timeseries start with the matched 
prefix path will be
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 8f17757cc0..c0283af12b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -818,6 +818,12 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
     }
   }
 
+  @Override
+  public Map<Integer, MetadataException> checkMeasurementExistence(
+      PartialPath devicePath, List<String> measurementList, List<String> 
aliasList) {
+    return mtree.checkMeasurementExistence(devicePath, measurementList, 
aliasList);
+  }
+
   /**
    * Delete all timeseries matching the given path pattern. If using prefix 
match, the path pattern
    * is used to match prefix path. All timeseries start with the matched 
prefix path will be
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 5ee3512d29..7f7efe56b7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -729,6 +729,12 @@ public class SchemaRegionSchemaFileImpl implements 
ISchemaRegion {
     }
   }
 
+  @Override
+  public Map<Integer, MetadataException> checkMeasurementExistence(
+      PartialPath devicePath, List<String> measurementList, List<String> 
aliasList) {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Delete all timeseries matching the given path pattern. If using prefix 
match, the path pattern
    * is used to match prefix path. All timeseries start with the matched 
prefix path will be
diff --git 
a/server/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionExecutionResult.java
similarity index 56%
copy from 
server/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
copy to 
server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionExecutionResult.java
index 7408a4c322..a611f7bbb1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionExecutionResult.java
@@ -17,11 +17,39 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.exception.sql;
+package org.apache.iotdb.db.mpp.execution.executor;
 
-public class SemanticException extends RuntimeException {
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 
-  public SemanticException(String message) {
-    super(message);
+public class RegionExecutionResult {
+
+  private boolean accepted;
+
+  private String message;
+
+  private TSStatus status;
+
+  public boolean isAccepted() {
+    return accepted;
+  }
+
+  public void setAccepted(boolean accepted) {
+    this.accepted = accepted;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  public void setMessage(String message) {
+    this.message = message;
+  }
+
+  public TSStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(TSStatus status) {
+    this.status = status;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
new file mode 100644
index 0000000000..3fda73a847
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.executor;
+
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.utils.SetThreadName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RegionReadExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RegionReadExecutor.class);
+
+  public RegionExecutionResult execute(
+      ConsensusGroupId groupId, FragmentInstance fragmentInstance) {
+    // execute fragment instance in state machine
+    ConsensusReadResponse readResponse;
+    try (SetThreadName threadName = new 
SetThreadName(fragmentInstance.getId().getFullId())) {
+      if (groupId instanceof DataRegionId) {
+        readResponse = DataRegionConsensusImpl.getInstance().read(groupId, 
fragmentInstance);
+      } else {
+        readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, 
fragmentInstance);
+      }
+      RegionExecutionResult resp = new RegionExecutionResult();
+      if (readResponse == null) {
+        LOGGER.error("ReadResponse is null");
+        resp.setAccepted(false);
+        resp.setMessage("ReadResponse is null");
+      } else if (!readResponse.isSuccess()) {
+        LOGGER.error(
+            "Execute FragmentInstance in ConsensusGroup {} failed.",
+            groupId,
+            readResponse.getException());
+        resp.setAccepted(false);
+        resp.setMessage(
+            "Execute FragmentInstance failed: "
+                + (readResponse.getException() == null
+                    ? ""
+                    : readResponse.getException().getMessage()));
+      } else {
+        FragmentInstanceInfo info = (FragmentInstanceInfo) 
readResponse.getDataset();
+        resp.setAccepted(!info.getState().isFailed());
+        resp.setMessage(info.getMessage());
+      }
+      return resp;
+    } catch (Throwable t) {
+      LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.", 
groupId, t);
+      RegionExecutionResult resp = new RegionExecutionResult();
+      resp.setAccepted(false);
+      resp.setMessage("Execute FragmentInstance failed: " + t.getMessage());
+      return resp;
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
new file mode 100644
index 0000000000..6e8b26f187
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.executor;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
+import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.service.thrift.impl.DataNodeRegionManager;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RegionWriteExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RegionWriteExecutor.class);
+
+  private static final DataNodeRegionManager REGION_MANAGER = 
DataNodeRegionManager.getInstance();
+
+  public RegionExecutionResult execute(ConsensusGroupId groupId, PlanNode 
planNode) {
+    WritePlanNodeExecutionContext context =
+        new WritePlanNodeExecutionContext(groupId, 
REGION_MANAGER.getRegionLock(groupId));
+    WritePlanNodeExecutionVisitor executionVisitor = new 
WritePlanNodeExecutionVisitor();
+    return planNode.accept(executionVisitor, context);
+  }
+
+  private static class WritePlanNodeExecutionVisitor
+      extends PlanVisitor<RegionExecutionResult, 
WritePlanNodeExecutionContext> {
+
+    private final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
+    @Override
+    public RegionExecutionResult visitPlan(PlanNode node, 
WritePlanNodeExecutionContext context) {
+      RegionExecutionResult response = new RegionExecutionResult();
+
+      ConsensusWriteResponse writeResponse =
+          executePlanNodeInConsensusLayer(context.getRegionId(), node);
+      // TODO need consider more status
+      if (writeResponse.getStatus() != null) {
+        response.setAccepted(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
writeResponse.getStatus().getCode());
+        response.setMessage(writeResponse.getStatus().message);
+        response.setStatus(writeResponse.getStatus());
+      } else {
+        LOGGER.error(
+            "Something wrong happened while calling consensus layer's write 
API.",
+            writeResponse.getException());
+        response.setAccepted(false);
+        response.setMessage(writeResponse.getException().getMessage());
+        
response.setStatus(RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR));
+      }
+      return response;
+    }
+
+    private ConsensusWriteResponse executePlanNodeInConsensusLayer(
+        ConsensusGroupId groupId, PlanNode planNode) {
+      if (groupId instanceof DataRegionId) {
+        return DataRegionConsensusImpl.getInstance().write(groupId, planNode);
+      } else {
+        return SchemaRegionConsensusImpl.getInstance().write(groupId, 
planNode);
+      }
+    }
+
+    @Override
+    public RegionExecutionResult visitInsertRow(
+        InsertRowNode node, WritePlanNodeExecutionContext context) {
+      return executeDataInsert(node, context);
+    }
+
+    @Override
+    public RegionExecutionResult visitInsertTablet(
+        InsertTabletNode node, WritePlanNodeExecutionContext context) {
+      return executeDataInsert(node, context);
+    }
+
+    @Override
+    public RegionExecutionResult visitInsertRows(
+        InsertRowsNode node, WritePlanNodeExecutionContext context) {
+      return executeDataInsert(node, context);
+    }
+
+    @Override
+    public RegionExecutionResult visitInsertMultiTablets(
+        InsertMultiTabletsNode node, WritePlanNodeExecutionContext context) {
+      return executeDataInsert(node, context);
+    }
+
+    @Override
+    public RegionExecutionResult visitInsertRowsOfOneDevice(
+        InsertRowsOfOneDeviceNode node, WritePlanNodeExecutionContext context) 
{
+      return executeDataInsert(node, context);
+    }
+
+    private RegionExecutionResult executeDataInsert(
+        InsertNode insertNode, WritePlanNodeExecutionContext context) {
+      RegionExecutionResult response = new RegionExecutionResult();
+      // data insertion should be blocked by data deletion, especially when 
deleting timeseries
+      context.getRegionWriteValidationRWLock().readLock().lock();
+      try {
+        try {
+          SchemaValidator.validate(insertNode);
+        } catch (SemanticException e) {
+          response.setAccepted(false);
+          response.setMessage(e.getMessage());
+          if (e.getCause() instanceof IoTDBException) {
+            IoTDBException ioTDBException = (IoTDBException) e.getCause();
+            response.setStatus(
+                RpcUtils.getStatus(ioTDBException.getErrorCode(), 
ioTDBException.getMessage()));
+          } else {
+            response.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, 
e.getMessage()));
+          }
+          return response;
+        }
+        boolean hasFailedMeasurement = insertNode.hasFailedMeasurements();
+        String partialInsertMessage = null;
+        if (hasFailedMeasurement) {
+          partialInsertMessage =
+              String.format(
+                  "Fail to insert measurements %s caused by %s",
+                  insertNode.getFailedMeasurements(), 
insertNode.getFailedMessages());
+          LOGGER.warn(partialInsertMessage);
+        }
+
+        ConsensusWriteResponse writeResponse =
+            DataRegionConsensusImpl.getInstance().write(context.getRegionId(), 
insertNode);
+
+        // TODO need consider more status
+        if (writeResponse.getStatus() != null) {
+          response.setAccepted(
+              !hasFailedMeasurement
+                  && TSStatusCode.SUCCESS_STATUS.getStatusCode()
+                      == writeResponse.getStatus().getCode());
+          if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
writeResponse.getStatus().getCode()) {
+            response.setMessage(writeResponse.getStatus().message);
+            response.setStatus(writeResponse.getStatus());
+          } else if (hasFailedMeasurement) {
+            response.setMessage(partialInsertMessage);
+            response.setStatus(
+                RpcUtils.getStatus(
+                    TSStatusCode.METADATA_ERROR.getStatusCode(), 
partialInsertMessage));
+          } else {
+            response.setMessage(writeResponse.getStatus().message);
+          }
+        } else {
+          LOGGER.error(
+              "Something wrong happened while calling consensus layer's write 
API.",
+              writeResponse.getException());
+          response.setAccepted(false);
+          response.setMessage(writeResponse.getException().getMessage());
+        }
+
+        return response;
+      } finally {
+        context.getRegionWriteValidationRWLock().readLock().unlock();
+      }
+    }
+
+    @Override
+    public RegionExecutionResult visitDeleteData(
+        DeleteDataNode node, WritePlanNodeExecutionContext context) {
+      // data deletion should block data insertion, especially when executed 
for deleting timeseries
+      context.getRegionWriteValidationRWLock().writeLock().lock();
+      try {
+        return super.visitDeleteData(node, context);
+      } finally {
+        context.getRegionWriteValidationRWLock().writeLock().unlock();
+      }
+    }
+
+    @Override
+    public RegionExecutionResult visitCreateTimeSeries(
+        CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) {
+      ISchemaRegion schemaRegion =
+          SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) 
context.getRegionId());
+      if 
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RatisConsensus))
 {
+        context.getRegionWriteValidationRWLock().writeLock().lock();
+        try {
+          Map<Integer, MetadataException> failingMeasurementMap =
+              schemaRegion.checkMeasurementExistence(
+                  node.getPath().getDevicePath(),
+                  Collections.singletonList(node.getPath().getMeasurement()),
+                  Collections.singletonList(node.getAlias()));
+          if (failingMeasurementMap.isEmpty()) {
+            return super.visitCreateTimeSeries(node, context);
+          } else {
+            MetadataException metadataException = failingMeasurementMap.get(0);
+            LOGGER.error("Metadata error: ", metadataException);
+            RegionExecutionResult result = new RegionExecutionResult();
+            result.setAccepted(false);
+            result.setMessage(metadataException.getMessage());
+            result.setStatus(
+                RpcUtils.getStatus(
+                    metadataException.getErrorCode(), 
metadataException.getMessage()));
+            return result;
+          }
+        } finally {
+          context.getRegionWriteValidationRWLock().writeLock().unlock();
+        }
+      } else {
+        return super.visitCreateTimeSeries(node, context);
+      }
+    }
+
+    @Override
+    public RegionExecutionResult visitCreateAlignedTimeSeries(
+        CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
+      ISchemaRegion schemaRegion =
+          SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) 
context.getRegionId());
+      if 
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RatisConsensus))
 {
+        context.getRegionWriteValidationRWLock().writeLock().lock();
+        try {
+          Map<Integer, MetadataException> failingMeasurementMap =
+              schemaRegion.checkMeasurementExistence(
+                  node.getDevicePath(), node.getMeasurements(), 
node.getAliasList());
+          if (failingMeasurementMap.isEmpty()) {
+            return super.visitCreateAlignedTimeSeries(node, context);
+          } else {
+            MetadataException metadataException = failingMeasurementMap.get(0);
+            LOGGER.error("Metadata error: ", metadataException);
+            RegionExecutionResult result = new RegionExecutionResult();
+            result.setAccepted(false);
+            result.setMessage(metadataException.getMessage());
+            result.setStatus(
+                RpcUtils.getStatus(
+                    metadataException.getErrorCode(), 
metadataException.getMessage()));
+            return result;
+          }
+        } finally {
+          context.getRegionWriteValidationRWLock().writeLock().unlock();
+        }
+      } else {
+        return super.visitCreateAlignedTimeSeries(node, context);
+      }
+    }
+
+    @Override
+    public RegionExecutionResult visitCreateMultiTimeSeries(
+        CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) 
{
+      ISchemaRegion schemaRegion =
+          SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) 
context.getRegionId());
+      if 
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RatisConsensus))
 {
+        context.getRegionWriteValidationRWLock().writeLock().lock();
+        try {
+          List<TSStatus> failingStatus = new ArrayList<>();
+          Map<PartialPath, MeasurementGroup> measurementGroupMap = 
node.getMeasurementGroupMap();
+          List<PartialPath> emptyDeviceList = new ArrayList<>();
+          for (Map.Entry<PartialPath, MeasurementGroup> entry : 
measurementGroupMap.entrySet()) {
+            Map<Integer, MetadataException> failingMeasurementMap =
+                schemaRegion.checkMeasurementExistence(
+                    entry.getKey(),
+                    entry.getValue().getMeasurements(),
+                    entry.getValue().getAliasList());
+            if (failingMeasurementMap.isEmpty()) {
+              continue;
+            }
+
+            // filter failed measurement and keep the rest for execution
+            for (Map.Entry<Integer, MetadataException> failingMeasurement :
+                failingMeasurementMap.entrySet()) {
+              entry.getValue().removeMeasurement(failingMeasurement.getKey());
+              LOGGER.error("Metadata error: ", failingMeasurement.getValue());
+              failingStatus.add(
+                  RpcUtils.getStatus(
+                      failingMeasurement.getValue().getErrorCode(),
+                      failingMeasurement.getValue().getMessage()));
+            }
+
+            if (entry.getValue().isEmpty()) {
+              emptyDeviceList.add(entry.getKey());
+            }
+          }
+
+          for (PartialPath emptyDevice : emptyDeviceList) {
+            measurementGroupMap.remove(emptyDevice);
+          }
+
+          if (!measurementGroupMap.isEmpty()) {
+            // try registering the rest timeseries
+            RegionExecutionResult executionResult = 
super.visitCreateMultiTimeSeries(node, context);
+            if (failingStatus.isEmpty()) {
+              return executionResult;
+            }
+
+            TSStatus executionStatus = executionResult.getStatus();
+            if (executionStatus.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+              failingStatus.addAll(executionStatus.getSubStatus());
+            } else if (executionStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+              failingStatus.add(executionStatus);
+            }
+          }
+
+          TSStatus status = RpcUtils.getStatus(failingStatus);
+          RegionExecutionResult failingResult = new RegionExecutionResult();
+          failingResult.setAccepted(false);
+          failingResult.setMessage(status.getMessage());
+          failingResult.setStatus(status);
+          return failingResult;
+        } finally {
+          context.getRegionWriteValidationRWLock().writeLock().unlock();
+        }
+      } else {
+        return super.visitCreateMultiTimeSeries(node, context);
+      }
+    }
+
+    @Override
+    public RegionExecutionResult visitInternalCreateTimeSeries(
+        InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
+      ISchemaRegion schemaRegion =
+          SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) 
context.getRegionId());
+      if 
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RatisConsensus))
 {
+        context.getRegionWriteValidationRWLock().writeLock().lock();
+        try {
+          List<TSStatus> failingStatus = new ArrayList<>();
+          List<TSStatus> alreadyExistingStatus = new ArrayList<>();
+          MeasurementGroup measurementGroup = node.getMeasurementGroup();
+          Map<Integer, MetadataException> failingMeasurementMap =
+              schemaRegion.checkMeasurementExistence(
+                  node.getDevicePath(),
+                  measurementGroup.getMeasurements(),
+                  measurementGroup.getAliasList());
+          MetadataException metadataException;
+          // filter failed measurement and keep the rest for execution
+          for (Map.Entry<Integer, MetadataException> failingMeasurement :
+              failingMeasurementMap.entrySet()) {
+            metadataException = failingMeasurement.getValue();
+            if (metadataException.getErrorCode()
+                == TSStatusCode.MEASUREMENT_ALREADY_EXIST.getStatusCode()) {
+              LOGGER.info(
+                  "There's no need to internal create timeseries. {}",
+                  failingMeasurement.getValue().getMessage());
+              alreadyExistingStatus.add(
+                  RpcUtils.getStatus(
+                      metadataException.getErrorCode(), 
metadataException.getMessage()));
+            } else {
+              LOGGER.error("Metadata error: ", metadataException);
+              failingStatus.add(
+                  RpcUtils.getStatus(
+                      metadataException.getErrorCode(), 
metadataException.getMessage()));
+            }
+            measurementGroup.removeMeasurement(failingMeasurement.getKey());
+          }
+
+          RegionExecutionResult executionResult =
+              super.visitInternalCreateTimeSeries(node, context);
+
+          if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
+            return executionResult;
+          }
+
+          TSStatus executionStatus = executionResult.getStatus();
+
+          // separate the measurement_already_exist exception and other 
exceptions process,
+          // measurement_already_exist exception is acceptable due to 
concurrent timeseries creation
+          if (failingStatus.isEmpty()) {
+            if (executionStatus.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+              if (executionStatus.getSubStatus().get(0).getCode()
+                  == TSStatusCode.MEASUREMENT_ALREADY_EXIST.getStatusCode()) {
+                // there's only measurement_already_exist exception
+                alreadyExistingStatus.addAll(executionStatus.getSubStatus());
+              } else {
+                failingStatus.addAll(executionStatus.getSubStatus());
+              }
+            } else if (executionStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+              failingStatus.add(executionStatus);
+            }
+          } else {
+            if (executionStatus.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+              if (executionStatus.getSubStatus().get(0).getCode()
+                  != TSStatusCode.MEASUREMENT_ALREADY_EXIST.getStatusCode()) {
+                failingStatus.addAll(executionStatus.getSubStatus());
+              }
+            } else if (executionStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+              failingStatus.add(executionStatus);
+            }
+          }
+
+          TSStatus status;
+          if (failingStatus.isEmpty()) {
+            status = RpcUtils.getStatus(failingStatus);
+          } else {
+            status = RpcUtils.getStatus(alreadyExistingStatus);
+          }
+
+          RegionExecutionResult result = new RegionExecutionResult();
+          result.setAccepted(false);
+          result.setMessage(status.getMessage());
+          result.setStatus(status);
+          return result;
+        } finally {
+          context.getRegionWriteValidationRWLock().writeLock().unlock();
+        }
+      } else {
+        return super.visitInternalCreateTimeSeries(node, context);
+      }
+    }
+  }
+
+  private static class WritePlanNodeExecutionContext {
+
+    private final ConsensusGroupId regionId;
+
+    private final ReentrantReadWriteLock regionRWLock;
+
+    WritePlanNodeExecutionContext(ConsensusGroupId regionId, 
ReentrantReadWriteLock regionRWLock) {
+      this.regionId = regionId;
+      this.regionRWLock = regionRWLock;
+    }
+
+    public ConsensusGroupId getRegionId() {
+      return regionId;
+    }
+
+    public ReentrantReadWriteLock getRegionWriteValidationRWLock() {
+      return regionRWLock;
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
index bceb07d210..391aed20dc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
@@ -63,7 +63,7 @@ public class SchemaValidator {
     try {
       insertNode.validateAndSetSchema(schemaTree);
     } catch (QueryProcessException | MetadataException e) {
-      throw new SemanticException(e.getMessage());
+      throw new SemanticException(e);
     }
 
     return schemaTree;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java
index 52c5fa02c9..fa102082a9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java
@@ -118,6 +118,33 @@ public class MeasurementGroup {
     attributesList.add(attributes);
   }
 
+  public void removeMeasurement(int index) {
+    measurements.remove(index);
+    dataTypes.remove(index);
+    encodings.remove(index);
+    compressors.remove(index);
+
+    if (aliasList != null) {
+      aliasList.remove(index);
+    }
+
+    if (propsList != null) {
+      propsList.remove(index);
+    }
+
+    if (tagsList != null) {
+      tagsList.remove(index);
+    }
+
+    if (attributesList != null) {
+      attributesList.remove(index);
+    }
+  }
+
+  public boolean isEmpty() {
+    return measurements.isEmpty();
+  }
+
   public void serialize(ByteBuffer byteBuffer) {
     // measurements
     ReadWriteIOUtils.write(measurements.size(), byteBuffer);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index a76aad7965..4a9461612b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -24,21 +24,15 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
-import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
+import org.apache.iotdb.db.mpp.execution.executor.RegionExecutionResult;
+import org.apache.iotdb.db.mpp.execution.executor.RegionReadExecutor;
+import org.apache.iotdb.db.mpp.execution.executor.RegionWriteExecutor;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
@@ -177,7 +171,13 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
           TSendPlanNodeResp sendPlanNodeResp = 
client.sendPlanNode(sendPlanNodeReq);
           if (!sendPlanNodeResp.accepted) {
             logger.error(sendPlanNodeResp.getStatus().message);
-            throw new 
FragmentInstanceDispatchException(sendPlanNodeResp.getStatus());
+            if (sendPlanNodeResp.getStatus() == null) {
+              throw new FragmentInstanceDispatchException(
+                  RpcUtils.getStatus(
+                      TSStatusCode.EXECUTE_STATEMENT_ERROR, 
sendPlanNodeResp.getMessage()));
+            } else {
+              throw new 
FragmentInstanceDispatchException(sendPlanNodeResp.getStatus());
+            }
           }
           break;
         default:
@@ -215,87 +215,27 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
 
     switch (instance.getType()) {
       case READ:
-        // execute fragment instance in state machine
-        ConsensusReadResponse readResponse;
-        try {
-          if (groupId instanceof DataRegionId) {
-            readResponse = DataRegionConsensusImpl.getInstance().read(groupId, 
instance);
-          } else {
-            readResponse = 
SchemaRegionConsensusImpl.getInstance().read(groupId, instance);
-          }
-          if (readResponse == null) {
-            logger.error("ReadResponse is null");
-            throw new FragmentInstanceDispatchException(
-                RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
"ReadResponse is null"));
-          }
-        } catch (Throwable t) {
-          logger.error("Execute FragmentInstance in ConsensusGroup {} 
failed.", groupId, t);
+        RegionReadExecutor readExecutor = new RegionReadExecutor();
+        RegionExecutionResult readResult = readExecutor.execute(groupId, 
instance);
+        if (!readResult.isAccepted()) {
+          logger.error(readResult.getMessage());
           throw new FragmentInstanceDispatchException(
-              RpcUtils.getStatus(
-                  TSStatusCode.EXECUTE_STATEMENT_ERROR,
-                  "Execute FragmentInstance failed. " + t.getMessage()));
-        }
-        if (!readResponse.isSuccess()) {
-          logger.error(
-              "dispatch FragmentInstance {} locally failed. ",
-              instance,
-              readResponse.getException());
-          throw new FragmentInstanceDispatchException(
-              RpcUtils.getStatus(
-                  TSStatusCode.EXECUTE_STATEMENT_ERROR,
-                  "Execute FragmentInstance failed: "
-                      + (readResponse.getException() == null
-                          ? ""
-                          : readResponse.getException().getMessage())));
-        } else {
-          FragmentInstanceInfo info = (FragmentInstanceInfo) 
readResponse.getDataset();
-          if (info.getState().isFailed()) {
-            throw new FragmentInstanceDispatchException(
-                RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
info.getMessage()));
-          }
+              RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
readResult.getMessage()));
         }
         break;
       case WRITE:
         PlanNode planNode = instance.getFragment().getPlanNodeTree();
-        boolean hasFailedMeasurement = false;
-        String partialInsertMessage = null;
-        if (planNode instanceof InsertNode) {
-          InsertNode insertNode = (InsertNode) planNode;
-          try {
-            SchemaValidator.validate(insertNode);
-          } catch (SemanticException e) {
+        RegionWriteExecutor writeExecutor = new RegionWriteExecutor();
+        RegionExecutionResult writeResult = writeExecutor.execute(groupId, 
planNode);
+        if (!writeResult.isAccepted()) {
+          logger.error(writeResult.getMessage());
+          if (writeResult.getStatus() == null) {
             throw new FragmentInstanceDispatchException(
-                
RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), 
e.getMessage()));
-          }
-          hasFailedMeasurement = insertNode.hasFailedMeasurements();
-          if (hasFailedMeasurement) {
-            partialInsertMessage =
-                String.format(
-                    "Fail to insert measurements %s caused by %s",
-                    insertNode.getFailedMeasurements(), 
insertNode.getFailedMessages());
-            logger.warn(partialInsertMessage);
+                RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
writeResult.getMessage()));
+          } else {
+            throw new 
FragmentInstanceDispatchException(writeResult.getStatus());
           }
         }
-        ConsensusWriteResponse writeResponse;
-        if (groupId instanceof DataRegionId) {
-          writeResponse = DataRegionConsensusImpl.getInstance().write(groupId, 
planNode);
-        } else {
-          writeResponse = 
SchemaRegionConsensusImpl.getInstance().write(groupId, planNode);
-        }
-
-        if (!writeResponse.isSuccessful()) {
-          logger.error(writeResponse.getErrorMessage());
-          TSStatus failureStatus =
-              writeResponse.getStatus() != null
-                  ? writeResponse.getStatus()
-                  : RpcUtils.getStatus(
-                      TSStatusCode.EXECUTE_STATEMENT_ERROR, 
writeResponse.getErrorMessage());
-          throw new FragmentInstanceDispatchException(failureStatus);
-        } else if (hasFailedMeasurement) {
-          throw new FragmentInstanceDispatchException(
-              RpcUtils.getStatus(
-                  TSStatusCode.METADATA_ERROR.getStatusCode(), 
partialInsertMessage));
-        }
         break;
       default:
         throw new FragmentInstanceDispatchException(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 1fd5c68e1a..0b112bec46 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.db.service.basic.ServiceProvider;
 import org.apache.iotdb.db.service.basic.StandaloneServiceProvider;
 import org.apache.iotdb.db.service.metrics.MetricService;
 import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.DataNodeRegionManager;
 import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.utils.WALMode;
@@ -304,6 +305,9 @@ public class DataNode implements DataNodeMBean {
       }
     }
 
+    // must init after SchemaEngine and StorageEngine prepared well
+    DataNodeRegionManager.getInstance().init();
+
     registerManager.register(SyncService.getInstance());
     registerManager.register(UpgradeSevice.getINSTANCE());
     // in mpp mode we temporarily don't start settle service because it uses 
StorageEngine directly
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index a3bb7f546d..33d8b9de14 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -42,7 +42,6 @@ import 
org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
 import org.apache.iotdb.db.auth.AuthorizerManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
@@ -61,6 +60,9 @@ import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.metadata.template.TemplateInternalRPCUpdateType;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.executor.RegionExecutionResult;
+import org.apache.iotdb.db.mpp.execution.executor.RegionReadExecutor;
+import org.apache.iotdb.db.mpp.execution.executor.RegionWriteExecutor;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
@@ -158,11 +160,10 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
   private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
 
-  private final DataNodeRegionManager regionManager;
+  private final DataNodeRegionManager regionManager = 
DataNodeRegionManager.getInstance();
 
   public DataNodeInternalRPCServiceImpl() {
     super();
-    regionManager = new DataNodeRegionManager(schemaEngine, storageEngine);
   }
 
   @Override
@@ -192,39 +193,13 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       return resp;
     }
 
-    // execute fragment instance in state machine
-    ConsensusReadResponse readResponse;
-    try (SetThreadName threadName = new 
SetThreadName(fragmentInstance.getId().getFullId())) {
-      if (groupId instanceof DataRegionId) {
-        readResponse = DataRegionConsensusImpl.getInstance().read(groupId, 
fragmentInstance);
-      } else {
-        readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, 
fragmentInstance);
-      }
-      TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
-      if (!readResponse.isSuccess()) {
-        LOGGER.error(
-            "Execute FragmentInstance in ConsensusGroup {} failed.",
-            req.getConsensusGroupId(),
-            readResponse.getException());
-        resp.setAccepted(false);
-        resp.setMessage(
-            "Execute FragmentInstance failed: "
-                + (readResponse.getException() == null
-                    ? ""
-                    : readResponse.getException().getMessage()));
-      } else {
-        FragmentInstanceInfo info = (FragmentInstanceInfo) 
readResponse.getDataset();
-        resp.setAccepted(!info.getState().isFailed());
-        resp.setMessage(info.getMessage());
-      }
-      return resp;
-    } catch (Throwable t) {
-      LOGGER.error(
-          "Execute FragmentInstance in ConsensusGroup {} failed.", 
req.getConsensusGroupId(), t);
-      TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
-      resp.setMessage("Execute FragmentInstance failed: " + t.getMessage());
-      return resp;
-    }
+    RegionReadExecutor executor = new RegionReadExecutor();
+    RegionExecutionResult executionResult = executor.execute(groupId, 
fragmentInstance);
+    TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
+    resp.setAccepted(executionResult.isAccepted());
+    resp.setMessage(executionResult.getMessage());
+
+    return resp;
   }
 
   @Override
@@ -233,7 +208,13 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
     PlanNode planNode = PlanNodeType.deserialize(req.planNode.body);
-    return regionManager.executePlanNode(groupId, planNode);
+    RegionWriteExecutor executor = new RegionWriteExecutor();
+    TSendPlanNodeResp resp = new TSendPlanNodeResp();
+    RegionExecutionResult executionResult = executor.execute(groupId, 
planNode);
+    resp.setAccepted(executionResult.isAccepted());
+    resp.setMessage(executionResult.getMessage());
+    resp.setStatus(executionResult.getStatus());
+    return resp;
   }
 
   @Override
@@ -355,10 +336,13 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       if (filteredPatternTree.isEmpty()) {
         continue;
       }
+      RegionWriteExecutor executor = new RegionWriteExecutor();
       status =
-          regionManager.executeSchemaPlanNode(
-              new SchemaRegionId(consensusGroupId.getId()),
-              new ConstructSchemaBlackListNode(new PlanNodeId(""), 
filteredPatternTree));
+          executor
+              .execute(
+                  new SchemaRegionId(consensusGroupId.getId()),
+                  new ConstructSchemaBlackListNode(new PlanNodeId(""), 
filteredPatternTree))
+              .getStatus();
       if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         preDeletedNum += Integer.parseInt(status.getMessage());
       } else {
@@ -388,10 +372,13 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       if (filteredPatternTree.isEmpty()) {
         continue;
       }
+      RegionWriteExecutor executor = new RegionWriteExecutor();
       status =
-          regionManager.executeSchemaPlanNode(
-              new SchemaRegionId(consensusGroupId.getId()),
-              new RollbackSchemaBlackListNode(new PlanNodeId(""), 
filteredPatternTree));
+          executor
+              .execute(
+                  new SchemaRegionId(consensusGroupId.getId()),
+                  new RollbackSchemaBlackListNode(new PlanNodeId(""), 
filteredPatternTree))
+              .getStatus();
       if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         failureList.add(status);
       }
@@ -465,10 +452,13 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     List<TSStatus> failureList = new ArrayList<>();
     TSStatus status;
     for (TConsensusGroupId consensusGroupId : req.getDataRegionIdList()) {
+      RegionWriteExecutor executor = new RegionWriteExecutor();
       status =
-          regionManager.executeDeleteDataForDeleteTimeSeries(
-              new DataRegionId(consensusGroupId.getId()),
-              new DeleteDataNode(new PlanNodeId(""), pathList, Long.MIN_VALUE, 
Long.MAX_VALUE));
+          executor
+              .execute(
+                  new DataRegionId(consensusGroupId.getId()),
+                  new DeleteDataNode(new PlanNodeId(""), pathList, 
Long.MIN_VALUE, Long.MAX_VALUE))
+              .getStatus();
       if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         failureList.add(status);
       }
@@ -496,10 +486,13 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       if (filteredPatternTree.isEmpty()) {
         continue;
       }
+      RegionWriteExecutor executor = new RegionWriteExecutor();
       status =
-          regionManager.executeSchemaPlanNode(
-              new SchemaRegionId(consensusGroupId.getId()),
-              new DeleteTimeSeriesNode(new PlanNodeId(""), 
filteredPatternTree));
+          executor
+              .execute(
+                  new SchemaRegionId(consensusGroupId.getId()),
+                  new DeleteTimeSeriesNode(new PlanNodeId(""), 
filteredPatternTree))
+              .getStatus();
       if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         failureList.add(status);
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
index e82b25f1f1..3a9ecdbc0a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
@@ -32,17 +32,11 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.exception.DataRegionException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -63,17 +57,25 @@ public class DataNodeRegionManager {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DataNodeRegionManager.class);
 
-  private final SchemaEngine schemaEngine;
-  private final StorageEngineV2 storageEngine;
+  private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
+  private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
 
   private final Map<SchemaRegionId, ReentrantReadWriteLock> 
schemaRegionLockMap =
       new ConcurrentHashMap<>();
   private final Map<DataRegionId, ReentrantReadWriteLock> dataRegionLockMap =
       new ConcurrentHashMap<>();
 
-  public DataNodeRegionManager(SchemaEngine schemaEngine, StorageEngineV2 
storageEngine) {
-    this.schemaEngine = schemaEngine;
-    this.storageEngine = storageEngine;
+  private static class DataNodeRegionManagerHolder {
+    private static final DataNodeRegionManager INSTANCE = new 
DataNodeRegionManager();
+
+    private DataNodeRegionManagerHolder() {}
+  }
+
+  public static DataNodeRegionManager getInstance() {
+    return DataNodeRegionManagerHolder.INSTANCE;
+  }
+
+  public void init() {
     schemaEngine
         .getAllSchemaRegions()
         .forEach(
@@ -88,125 +90,17 @@ public class DataNodeRegionManager {
             dataRegionId -> dataRegionLockMap.put(dataRegionId, new 
ReentrantReadWriteLock(false)));
   }
 
-  public TSendPlanNodeResp executePlanNode(ConsensusGroupId groupId, PlanNode 
planNode) {
-    if (planNode instanceof InsertNode) {
-      return executeDataInsert((DataRegionId) groupId, (InsertNode) planNode);
-    } else {
-      TSendPlanNodeResp response = new TSendPlanNodeResp();
-      ConsensusWriteResponse writeResponse = 
executePlanNodeInConsensusLayer(groupId, planNode);
-      // TODO need consider more status
-      if (writeResponse.getStatus() != null) {
-        response.setAccepted(
-            TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
writeResponse.getStatus().getCode());
-        response.setMessage(writeResponse.getStatus().message);
-        response.setStatus(writeResponse.getStatus());
-      } else {
-        LOGGER.error(
-            "Something wrong happened while calling consensus layer's write 
API.",
-            writeResponse.getException());
-        response.setAccepted(false);
-        response.setMessage(writeResponse.getException().getMessage());
-      }
-      return response;
-    }
+  public void clear() {
+    schemaRegionLockMap.clear();
+    dataRegionLockMap.clear();
   }
 
-  private ConsensusWriteResponse executePlanNodeInConsensusLayer(
-      ConsensusGroupId groupId, PlanNode planNode) {
-    if (groupId instanceof DataRegionId) {
-      return DataRegionConsensusImpl.getInstance().write(groupId, planNode);
-    } else {
-      return SchemaRegionConsensusImpl.getInstance().write(groupId, planNode);
-    }
-  }
-
-  private TSendPlanNodeResp executeDataInsert(DataRegionId dataRegionId, 
InsertNode insertNode) {
-    TSendPlanNodeResp response = new TSendPlanNodeResp();
-    dataRegionLockMap.get(dataRegionId).readLock().lock();
-    try {
-      try {
-        SchemaValidator.validate(insertNode);
-      } catch (SemanticException e) {
-        response.setAccepted(false);
-        response.setStatus(
-            RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), 
e.getMessage()));
-        response.setMessage(e.getMessage());
-        return response;
-      }
-      boolean hasFailedMeasurement = insertNode.hasFailedMeasurements();
-      String partialInsertMessage = null;
-      if (hasFailedMeasurement) {
-        partialInsertMessage =
-            String.format(
-                "Fail to insert measurements %s caused by %s",
-                insertNode.getFailedMeasurements(), 
insertNode.getFailedMessages());
-        LOGGER.warn(partialInsertMessage);
-      }
-
-      ConsensusWriteResponse writeResponse =
-          executePlanNodeInConsensusLayer(dataRegionId, insertNode);
+  private DataNodeRegionManager() {}
 
-      // TODO need consider more status
-      if (writeResponse.getStatus() != null) {
-        response.setAccepted(
-            !hasFailedMeasurement
-                && TSStatusCode.SUCCESS_STATUS.getStatusCode()
-                    == writeResponse.getStatus().getCode());
-        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
writeResponse.getStatus().getCode()) {
-          response.setMessage(writeResponse.getStatus().message);
-          response.setStatus(writeResponse.getStatus());
-        } else if (hasFailedMeasurement) {
-          response.setMessage(partialInsertMessage);
-          response.setStatus(
-              RpcUtils.getStatus(
-                  TSStatusCode.METADATA_ERROR.getStatusCode(), 
partialInsertMessage));
-        } else {
-          response.setMessage(writeResponse.getStatus().message);
-        }
-      } else {
-        LOGGER.error(
-            "Something wrong happened while calling consensus layer's write 
API.",
-            writeResponse.getException());
-        response.setAccepted(false);
-        response.setMessage(writeResponse.getException().getMessage());
-      }
-
-      return response;
-    } finally {
-      dataRegionLockMap.get(dataRegionId).readLock().unlock();
-    }
-  }
-
-  public TSStatus executeSchemaPlanNode(SchemaRegionId schemaRegionId, 
PlanNode planNode) {
-    ConsensusWriteResponse writeResponse =
-        executePlanNodeInConsensusLayer(schemaRegionId, planNode);
-    TSStatus status = writeResponse.getStatus();
-    if (status == null) {
-      LOGGER.error(
-          "Something wrong happened while calling consensus layer's write 
API.",
-          writeResponse.getException());
-      return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    }
-    return status;
-  }
-
-  public TSStatus executeDeleteDataForDeleteTimeSeries(
-      DataRegionId dataRegionId, PlanNode planNode) {
-    dataRegionLockMap.get(dataRegionId).writeLock().lock();
-    try {
-      ConsensusWriteResponse writeResponse =
-          executePlanNodeInConsensusLayer(dataRegionId, planNode);
-      TSStatus status = writeResponse.getStatus();
-      if (status == null) {
-        LOGGER.error(
-            "Something wrong happened while calling consensus layer's write 
API.",
-            writeResponse.getException());
-        return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
-      }
-      return status;
-    } finally {
-      dataRegionLockMap.get(dataRegionId).writeLock().unlock();
-    }
+  public ReentrantReadWriteLock getRegionLock(ConsensusGroupId 
consensusGroupId) {
+    return consensusGroupId instanceof DataRegionId
+        ? dataRegionLockMap.get((DataRegionId) consensusGroupId)
+        : schemaRegionLockMap.get((SchemaRegionId) consensusGroupId);
   }
 
   public TSStatus createSchemaRegion(TRegionReplicaSet regionReplicaSet, 
String storageGroup) {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
 
b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
index edc327ad7c..e3008808d0 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlign
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.service.thrift.impl.DataNodeInternalRPCServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.DataNodeRegionManager;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
 import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
@@ -76,6 +77,7 @@ public class DataNodeInternalRPCServiceImplTest {
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new 
PartialPath("root.ln"));
     DataRegionConsensusImpl.setupAndGetInstance().start();
     SchemaRegionConsensusImpl.setupAndGetInstance().start();
+    DataNodeRegionManager.getInstance().init();
   }
 
   @Before
@@ -99,6 +101,7 @@ public class DataNodeInternalRPCServiceImplTest {
 
   @AfterClass
   public static void tearDownAfterClass() throws IOException, 
StorageEngineException {
+    DataNodeRegionManager.getInstance().clear();
     DataRegionConsensusImpl.getInstance().stop();
     SchemaRegionConsensusImpl.getInstance().stop();
     IoTDB.configManager.clear();

Reply via email to