This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2a30316055 [IOTDB-4062] Add schema write check above consensus layer
(#7472)
2a30316055 is described below
commit 2a30316055d76442463fa7dbcded1eeaf13b28c1
Author: Marcos_Zyk <[email protected]>
AuthorDate: Fri Sep 30 15:15:10 2022 +0800
[IOTDB-4062] Add schema write check above consensus layer (#7472)
---
.../schemaregion/rocksdb/RSchemaRegion.java | 6 +
.../iotdb/db/exception/sql/SemanticException.java | 4 +
.../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 48 ++-
.../db/metadata/schemaregion/ISchemaRegion.java | 3 +
.../schemaregion/SchemaRegionMemoryImpl.java | 6 +
.../schemaregion/SchemaRegionSchemaFileImpl.java | 6 +
.../execution/executor/RegionExecutionResult.java} | 36 +-
.../mpp/execution/executor/RegionReadExecutor.java | 78 ++++
.../execution/executor/RegionWriteExecutor.java | 470 +++++++++++++++++++++
.../iotdb/db/mpp/plan/analyze/SchemaValidator.java | 2 +-
.../plan/node/metedata/write/MeasurementGroup.java | 27 ++
.../scheduler/FragmentInstanceDispatcherImpl.java | 106 +----
.../java/org/apache/iotdb/db/service/DataNode.java | 4 +
.../impl/DataNodeInternalRPCServiceImpl.java | 91 ++--
.../service/thrift/impl/DataNodeRegionManager.java | 148 +------
.../DataNodeInternalRPCServiceImplTest.java | 3 +
16 files changed, 773 insertions(+), 265 deletions(-)
diff --git
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 1a5ba6715e..153132a0a7 100644
---
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -532,6 +532,12 @@ public class RSchemaRegion implements ISchemaRegion {
}
}
+ @Override
+ public Map<Integer, MetadataException> checkMeasurementExistence(
+ PartialPath devicePath, List<String> measurementList, List<String>
aliasList) {
+ throw new UnsupportedOperationException();
+ }
+
private void createEntityRecursively(String[] nodes, int start, int end,
boolean aligned)
throws RocksDBException, MetadataException, InterruptedException {
if (start <= end) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
b/server/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
index 7408a4c322..2e17d0775d 100644
---
a/server/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
+++
b/server/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
@@ -24,4 +24,8 @@ public class SemanticException extends RuntimeException {
public SemanticException(String message) {
super(message);
}
+
+ public SemanticException(Throwable cause) {
+ super(cause);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index f316960b2d..3a05d885c4 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -71,8 +71,10 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -383,6 +385,50 @@ public class MTreeBelowSGMemoryImpl implements
IMTreeBelowSG {
return new Pair<>(cur, upperTemplate);
}
+ public Map<Integer, MetadataException> checkMeasurementExistence(
+ PartialPath devicePath, List<String> measurementList, List<String>
aliasList) {
+ IMNode device = null;
+ try {
+ device = getNodeByPath(devicePath);
+ } catch (PathNotExistException e) {
+ return Collections.emptyMap();
+ }
+
+ if (!device.isEntity()) {
+ return Collections.emptyMap();
+ }
+ Map<Integer, MetadataException> failingMeasurementMap = new HashMap<>();
+ for (int i = 0; i < measurementList.size(); i++) {
+ if (device.hasChild(measurementList.get(i))) {
+ IMNode node = device.getChild(measurementList.get(i));
+ if (node.isMeasurement()) {
+ if (node.getAsMeasurementMNode().isPreDeleted()) {
+ failingMeasurementMap.put(
+ i,
+ new
MeasurementInBlackListException(devicePath.concatNode(measurementList.get(i))));
+ } else {
+ failingMeasurementMap.put(
+ i,
+ new MeasurementAlreadyExistException(
+ devicePath.getFullPath() + "." + measurementList.get(i),
+ node.getAsMeasurementMNode().getMeasurementPath()));
+ }
+ } else {
+ failingMeasurementMap.put(
+ i,
+ new PathAlreadyExistException(
+ devicePath.getFullPath() + "." + measurementList.get(i)));
+ }
+ }
+ if (aliasList != null && aliasList.get(i) != null &&
device.hasChild(aliasList.get(i))) {
+ failingMeasurementMap.put(
+ i,
+ new AliasAlreadyExistException(
+ devicePath.getFullPath() + "." + measurementList.get(i),
aliasList.get(i)));
+ }
+ }
+ return failingMeasurementMap;
+ }
/**
* Delete path. The path should be a full path from root to leaf node
*
@@ -1021,7 +1067,7 @@ public class MTreeBelowSGMemoryImpl implements
IMTreeBelowSG {
* @return last node in given seriesPath
*/
@Override
- public IMNode getNodeByPath(PartialPath path) throws MetadataException {
+ public IMNode getNodeByPath(PartialPath path) throws PathNotExistException {
String[] nodes = path.getNodes();
IMNode cur = storageGroupMNode;
IMNode next;
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index 3a146ad3e3..64ced4db0e 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -110,6 +110,9 @@ public interface ISchemaRegion {
void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws
MetadataException;
+ Map<Integer, MetadataException> checkMeasurementExistence(
+ PartialPath devicePath, List<String> measurementList, List<String>
aliasList);
+
/**
* Delete all timeseries matching the given path pattern. If using prefix
match, the path pattern
* is used to match prefix path. All timeseries start with the matched
prefix path will be
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 8f17757cc0..c0283af12b 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -818,6 +818,12 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
}
}
+ @Override
+ public Map<Integer, MetadataException> checkMeasurementExistence(
+ PartialPath devicePath, List<String> measurementList, List<String>
aliasList) {
+ return mtree.checkMeasurementExistence(devicePath, measurementList,
aliasList);
+ }
+
/**
* Delete all timeseries matching the given path pattern. If using prefix
match, the path pattern
* is used to match prefix path. All timeseries start with the matched
prefix path will be
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 5ee3512d29..7f7efe56b7 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -729,6 +729,12 @@ public class SchemaRegionSchemaFileImpl implements
ISchemaRegion {
}
}
+ @Override
+ public Map<Integer, MetadataException> checkMeasurementExistence(
+ PartialPath devicePath, List<String> measurementList, List<String>
aliasList) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Delete all timeseries matching the given path pattern. If using prefix
match, the path pattern
* is used to match prefix path. All timeseries start with the matched
prefix path will be
diff --git
a/server/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionExecutionResult.java
similarity index 56%
copy from
server/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
copy to
server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionExecutionResult.java
index 7408a4c322..a611f7bbb1 100644
---
a/server/src/main/java/org/apache/iotdb/db/exception/sql/SemanticException.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionExecutionResult.java
@@ -17,11 +17,39 @@
* under the License.
*/
-package org.apache.iotdb.db.exception.sql;
+package org.apache.iotdb.db.mpp.execution.executor;
-public class SemanticException extends RuntimeException {
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
- public SemanticException(String message) {
- super(message);
+public class RegionExecutionResult {
+
+ private boolean accepted;
+
+ private String message;
+
+ private TSStatus status;
+
+ public boolean isAccepted() {
+ return accepted;
+ }
+
+ public void setAccepted(boolean accepted) {
+ this.accepted = accepted;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public TSStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(TSStatus status) {
+ this.status = status;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
new file mode 100644
index 0000000000..3fda73a847
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.executor;
+
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.utils.SetThreadName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RegionReadExecutor {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RegionReadExecutor.class);
+
+ public RegionExecutionResult execute(
+ ConsensusGroupId groupId, FragmentInstance fragmentInstance) {
+ // execute fragment instance in state machine
+ ConsensusReadResponse readResponse;
+ try (SetThreadName threadName = new
SetThreadName(fragmentInstance.getId().getFullId())) {
+ if (groupId instanceof DataRegionId) {
+ readResponse = DataRegionConsensusImpl.getInstance().read(groupId,
fragmentInstance);
+ } else {
+ readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId,
fragmentInstance);
+ }
+ RegionExecutionResult resp = new RegionExecutionResult();
+ if (readResponse == null) {
+ LOGGER.error("ReadResponse is null");
+ resp.setAccepted(false);
+ resp.setMessage("ReadResponse is null");
+ } else if (!readResponse.isSuccess()) {
+ LOGGER.error(
+ "Execute FragmentInstance in ConsensusGroup {} failed.",
+ groupId,
+ readResponse.getException());
+ resp.setAccepted(false);
+ resp.setMessage(
+ "Execute FragmentInstance failed: "
+ + (readResponse.getException() == null
+ ? ""
+ : readResponse.getException().getMessage()));
+ } else {
+ FragmentInstanceInfo info = (FragmentInstanceInfo)
readResponse.getDataset();
+ resp.setAccepted(!info.getState().isFailed());
+ resp.setMessage(info.getMessage());
+ }
+ return resp;
+ } catch (Throwable t) {
+ LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.",
groupId, t);
+ RegionExecutionResult resp = new RegionExecutionResult();
+ resp.setAccepted(false);
+ resp.setMessage("Execute FragmentInstance failed: " + t.getMessage());
+ return resp;
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
new file mode 100644
index 0000000000..6e8b26f187
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.executor;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
+import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.service.thrift.impl.DataNodeRegionManager;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RegionWriteExecutor {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RegionWriteExecutor.class);
+
+ private static final DataNodeRegionManager REGION_MANAGER =
DataNodeRegionManager.getInstance();
+
+ public RegionExecutionResult execute(ConsensusGroupId groupId, PlanNode
planNode) {
+ WritePlanNodeExecutionContext context =
+ new WritePlanNodeExecutionContext(groupId,
REGION_MANAGER.getRegionLock(groupId));
+ WritePlanNodeExecutionVisitor executionVisitor = new
WritePlanNodeExecutionVisitor();
+ return planNode.accept(executionVisitor, context);
+ }
+
+ private static class WritePlanNodeExecutionVisitor
+ extends PlanVisitor<RegionExecutionResult,
WritePlanNodeExecutionContext> {
+
+ private final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+
+ @Override
+ public RegionExecutionResult visitPlan(PlanNode node,
WritePlanNodeExecutionContext context) {
+ RegionExecutionResult response = new RegionExecutionResult();
+
+ ConsensusWriteResponse writeResponse =
+ executePlanNodeInConsensusLayer(context.getRegionId(), node);
+ // TODO need consider more status
+ if (writeResponse.getStatus() != null) {
+ response.setAccepted(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
writeResponse.getStatus().getCode());
+ response.setMessage(writeResponse.getStatus().message);
+ response.setStatus(writeResponse.getStatus());
+ } else {
+ LOGGER.error(
+ "Something wrong happened while calling consensus layer's write
API.",
+ writeResponse.getException());
+ response.setAccepted(false);
+ response.setMessage(writeResponse.getException().getMessage());
+
response.setStatus(RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR));
+ }
+ return response;
+ }
+
+ private ConsensusWriteResponse executePlanNodeInConsensusLayer(
+ ConsensusGroupId groupId, PlanNode planNode) {
+ if (groupId instanceof DataRegionId) {
+ return DataRegionConsensusImpl.getInstance().write(groupId, planNode);
+ } else {
+ return SchemaRegionConsensusImpl.getInstance().write(groupId,
planNode);
+ }
+ }
+
+ @Override
+ public RegionExecutionResult visitInsertRow(
+ InsertRowNode node, WritePlanNodeExecutionContext context) {
+ return executeDataInsert(node, context);
+ }
+
+ @Override
+ public RegionExecutionResult visitInsertTablet(
+ InsertTabletNode node, WritePlanNodeExecutionContext context) {
+ return executeDataInsert(node, context);
+ }
+
+ @Override
+ public RegionExecutionResult visitInsertRows(
+ InsertRowsNode node, WritePlanNodeExecutionContext context) {
+ return executeDataInsert(node, context);
+ }
+
+ @Override
+ public RegionExecutionResult visitInsertMultiTablets(
+ InsertMultiTabletsNode node, WritePlanNodeExecutionContext context) {
+ return executeDataInsert(node, context);
+ }
+
+ @Override
+ public RegionExecutionResult visitInsertRowsOfOneDevice(
+ InsertRowsOfOneDeviceNode node, WritePlanNodeExecutionContext context)
{
+ return executeDataInsert(node, context);
+ }
+
+ private RegionExecutionResult executeDataInsert(
+ InsertNode insertNode, WritePlanNodeExecutionContext context) {
+ RegionExecutionResult response = new RegionExecutionResult();
+ // data insertion should be blocked by data deletion, especially when
deleting timeseries
+ context.getRegionWriteValidationRWLock().readLock().lock();
+ try {
+ try {
+ SchemaValidator.validate(insertNode);
+ } catch (SemanticException e) {
+ response.setAccepted(false);
+ response.setMessage(e.getMessage());
+ if (e.getCause() instanceof IoTDBException) {
+ IoTDBException ioTDBException = (IoTDBException) e.getCause();
+ response.setStatus(
+ RpcUtils.getStatus(ioTDBException.getErrorCode(),
ioTDBException.getMessage()));
+ } else {
+ response.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR,
e.getMessage()));
+ }
+ return response;
+ }
+ boolean hasFailedMeasurement = insertNode.hasFailedMeasurements();
+ String partialInsertMessage = null;
+ if (hasFailedMeasurement) {
+ partialInsertMessage =
+ String.format(
+ "Fail to insert measurements %s caused by %s",
+ insertNode.getFailedMeasurements(),
insertNode.getFailedMessages());
+ LOGGER.warn(partialInsertMessage);
+ }
+
+ ConsensusWriteResponse writeResponse =
+ DataRegionConsensusImpl.getInstance().write(context.getRegionId(),
insertNode);
+
+ // TODO need consider more status
+ if (writeResponse.getStatus() != null) {
+ response.setAccepted(
+ !hasFailedMeasurement
+ && TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ == writeResponse.getStatus().getCode());
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
writeResponse.getStatus().getCode()) {
+ response.setMessage(writeResponse.getStatus().message);
+ response.setStatus(writeResponse.getStatus());
+ } else if (hasFailedMeasurement) {
+ response.setMessage(partialInsertMessage);
+ response.setStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.METADATA_ERROR.getStatusCode(),
partialInsertMessage));
+ } else {
+ response.setMessage(writeResponse.getStatus().message);
+ }
+ } else {
+ LOGGER.error(
+ "Something wrong happened while calling consensus layer's write
API.",
+ writeResponse.getException());
+ response.setAccepted(false);
+ response.setMessage(writeResponse.getException().getMessage());
+ }
+
+ return response;
+ } finally {
+ context.getRegionWriteValidationRWLock().readLock().unlock();
+ }
+ }
+
+ @Override
+ public RegionExecutionResult visitDeleteData(
+ DeleteDataNode node, WritePlanNodeExecutionContext context) {
+ // data deletion should block data insertion, especially when executed
for deleting timeseries
+ context.getRegionWriteValidationRWLock().writeLock().lock();
+ try {
+ return super.visitDeleteData(node, context);
+ } finally {
+ context.getRegionWriteValidationRWLock().writeLock().unlock();
+ }
+ }
+
+ @Override
+ public RegionExecutionResult visitCreateTimeSeries(
+ CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) {
+ ISchemaRegion schemaRegion =
+ SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId)
context.getRegionId());
+ if
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RatisConsensus))
{
+ context.getRegionWriteValidationRWLock().writeLock().lock();
+ try {
+ Map<Integer, MetadataException> failingMeasurementMap =
+ schemaRegion.checkMeasurementExistence(
+ node.getPath().getDevicePath(),
+ Collections.singletonList(node.getPath().getMeasurement()),
+ Collections.singletonList(node.getAlias()));
+ if (failingMeasurementMap.isEmpty()) {
+ return super.visitCreateTimeSeries(node, context);
+ } else {
+ MetadataException metadataException = failingMeasurementMap.get(0);
+ LOGGER.error("Metadata error: ", metadataException);
+ RegionExecutionResult result = new RegionExecutionResult();
+ result.setAccepted(false);
+ result.setMessage(metadataException.getMessage());
+ result.setStatus(
+ RpcUtils.getStatus(
+ metadataException.getErrorCode(),
metadataException.getMessage()));
+ return result;
+ }
+ } finally {
+ context.getRegionWriteValidationRWLock().writeLock().unlock();
+ }
+ } else {
+ return super.visitCreateTimeSeries(node, context);
+ }
+ }
+
+ @Override
+ public RegionExecutionResult visitCreateAlignedTimeSeries(
+ CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext
context) {
+ ISchemaRegion schemaRegion =
+ SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId)
context.getRegionId());
+ if
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RatisConsensus))
{
+ context.getRegionWriteValidationRWLock().writeLock().lock();
+ try {
+ Map<Integer, MetadataException> failingMeasurementMap =
+ schemaRegion.checkMeasurementExistence(
+ node.getDevicePath(), node.getMeasurements(),
node.getAliasList());
+ if (failingMeasurementMap.isEmpty()) {
+ return super.visitCreateAlignedTimeSeries(node, context);
+ } else {
+ MetadataException metadataException = failingMeasurementMap.get(0);
+ LOGGER.error("Metadata error: ", metadataException);
+ RegionExecutionResult result = new RegionExecutionResult();
+ result.setAccepted(false);
+ result.setMessage(metadataException.getMessage());
+ result.setStatus(
+ RpcUtils.getStatus(
+ metadataException.getErrorCode(),
metadataException.getMessage()));
+ return result;
+ }
+ } finally {
+ context.getRegionWriteValidationRWLock().writeLock().unlock();
+ }
+ } else {
+ return super.visitCreateAlignedTimeSeries(node, context);
+ }
+ }
+
+ @Override
+ public RegionExecutionResult visitCreateMultiTimeSeries(
+ CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context)
{
+ ISchemaRegion schemaRegion =
+ SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId)
context.getRegionId());
+ if
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RatisConsensus))
{
+ context.getRegionWriteValidationRWLock().writeLock().lock();
+ try {
+ List<TSStatus> failingStatus = new ArrayList<>();
+ Map<PartialPath, MeasurementGroup> measurementGroupMap =
node.getMeasurementGroupMap();
+ List<PartialPath> emptyDeviceList = new ArrayList<>();
+ for (Map.Entry<PartialPath, MeasurementGroup> entry :
measurementGroupMap.entrySet()) {
+ Map<Integer, MetadataException> failingMeasurementMap =
+ schemaRegion.checkMeasurementExistence(
+ entry.getKey(),
+ entry.getValue().getMeasurements(),
+ entry.getValue().getAliasList());
+ if (failingMeasurementMap.isEmpty()) {
+ continue;
+ }
+
+ // filter failed measurement and keep the rest for execution
+ for (Map.Entry<Integer, MetadataException> failingMeasurement :
+ failingMeasurementMap.entrySet()) {
+ entry.getValue().removeMeasurement(failingMeasurement.getKey());
+ LOGGER.error("Metadata error: ", failingMeasurement.getValue());
+ failingStatus.add(
+ RpcUtils.getStatus(
+ failingMeasurement.getValue().getErrorCode(),
+ failingMeasurement.getValue().getMessage()));
+ }
+
+ if (entry.getValue().isEmpty()) {
+ emptyDeviceList.add(entry.getKey());
+ }
+ }
+
+ for (PartialPath emptyDevice : emptyDeviceList) {
+ measurementGroupMap.remove(emptyDevice);
+ }
+
+ if (!measurementGroupMap.isEmpty()) {
+ // try registering the rest timeseries
+ RegionExecutionResult executionResult =
super.visitCreateMultiTimeSeries(node, context);
+ if (failingStatus.isEmpty()) {
+ return executionResult;
+ }
+
+ TSStatus executionStatus = executionResult.getStatus();
+ if (executionStatus.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ failingStatus.addAll(executionStatus.getSubStatus());
+ } else if (executionStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failingStatus.add(executionStatus);
+ }
+ }
+
+ TSStatus status = RpcUtils.getStatus(failingStatus);
+ RegionExecutionResult failingResult = new RegionExecutionResult();
+ failingResult.setAccepted(false);
+ failingResult.setMessage(status.getMessage());
+ failingResult.setStatus(status);
+ return failingResult;
+ } finally {
+ context.getRegionWriteValidationRWLock().writeLock().unlock();
+ }
+ } else {
+ return super.visitCreateMultiTimeSeries(node, context);
+ }
+ }
+
+ @Override
+ public RegionExecutionResult visitInternalCreateTimeSeries(
+ InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext
context) {
+ ISchemaRegion schemaRegion =
+ SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId)
context.getRegionId());
+ if
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RatisConsensus))
{
+ context.getRegionWriteValidationRWLock().writeLock().lock();
+ try {
+ List<TSStatus> failingStatus = new ArrayList<>();
+ List<TSStatus> alreadyExistingStatus = new ArrayList<>();
+ MeasurementGroup measurementGroup = node.getMeasurementGroup();
+ Map<Integer, MetadataException> failingMeasurementMap =
+ schemaRegion.checkMeasurementExistence(
+ node.getDevicePath(),
+ measurementGroup.getMeasurements(),
+ measurementGroup.getAliasList());
+ MetadataException metadataException;
+ // filter failed measurement and keep the rest for execution
+ for (Map.Entry<Integer, MetadataException> failingMeasurement :
+ failingMeasurementMap.entrySet()) {
+ metadataException = failingMeasurement.getValue();
+ if (metadataException.getErrorCode()
+ == TSStatusCode.MEASUREMENT_ALREADY_EXIST.getStatusCode()) {
+ LOGGER.info(
+ "There's no need to internal create timeseries. {}",
+ failingMeasurement.getValue().getMessage());
+ alreadyExistingStatus.add(
+ RpcUtils.getStatus(
+ metadataException.getErrorCode(),
metadataException.getMessage()));
+ } else {
+ LOGGER.error("Metadata error: ", metadataException);
+ failingStatus.add(
+ RpcUtils.getStatus(
+ metadataException.getErrorCode(),
metadataException.getMessage()));
+ }
+ measurementGroup.removeMeasurement(failingMeasurement.getKey());
+ }
+
+ RegionExecutionResult executionResult =
+ super.visitInternalCreateTimeSeries(node, context);
+
+ if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
+ return executionResult;
+ }
+
+ TSStatus executionStatus = executionResult.getStatus();
+
+ // separate the measurement_already_exist exception and other
exceptions process,
+ // measurement_already_exist exception is acceptable due to
concurrent timeseries creation
+ if (failingStatus.isEmpty()) {
+ if (executionStatus.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ if (executionStatus.getSubStatus().get(0).getCode()
+ == TSStatusCode.MEASUREMENT_ALREADY_EXIST.getStatusCode()) {
+ // there's only measurement_already_exist exception
+ alreadyExistingStatus.addAll(executionStatus.getSubStatus());
+ } else {
+ failingStatus.addAll(executionStatus.getSubStatus());
+ }
+ } else if (executionStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failingStatus.add(executionStatus);
+ }
+ } else {
+ if (executionStatus.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ if (executionStatus.getSubStatus().get(0).getCode()
+ != TSStatusCode.MEASUREMENT_ALREADY_EXIST.getStatusCode()) {
+ failingStatus.addAll(executionStatus.getSubStatus());
+ }
+ } else if (executionStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failingStatus.add(executionStatus);
+ }
+ }
+
+ TSStatus status;
+ if (failingStatus.isEmpty()) {
+ status = RpcUtils.getStatus(failingStatus);
+ } else {
+ status = RpcUtils.getStatus(alreadyExistingStatus);
+ }
+
+ RegionExecutionResult result = new RegionExecutionResult();
+ result.setAccepted(false);
+ result.setMessage(status.getMessage());
+ result.setStatus(status);
+ return result;
+ } finally {
+ context.getRegionWriteValidationRWLock().writeLock().unlock();
+ }
+ } else {
+ return super.visitInternalCreateTimeSeries(node, context);
+ }
+ }
+ }
+
+ private static class WritePlanNodeExecutionContext {
+
+ private final ConsensusGroupId regionId;
+
+ private final ReentrantReadWriteLock regionRWLock;
+
+ WritePlanNodeExecutionContext(ConsensusGroupId regionId,
ReentrantReadWriteLock regionRWLock) {
+ this.regionId = regionId;
+ this.regionRWLock = regionRWLock;
+ }
+
+ public ConsensusGroupId getRegionId() {
+ return regionId;
+ }
+
+ public ReentrantReadWriteLock getRegionWriteValidationRWLock() {
+ return regionRWLock;
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
index bceb07d210..391aed20dc 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
@@ -63,7 +63,7 @@ public class SchemaValidator {
try {
insertNode.validateAndSetSchema(schemaTree);
} catch (QueryProcessException | MetadataException e) {
- throw new SemanticException(e.getMessage());
+ throw new SemanticException(e);
}
return schemaTree;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java
index 52c5fa02c9..fa102082a9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java
@@ -118,6 +118,33 @@ public class MeasurementGroup {
attributesList.add(attributes);
}
+ public void removeMeasurement(int index) {
+ measurements.remove(index);
+ dataTypes.remove(index);
+ encodings.remove(index);
+ compressors.remove(index);
+
+ if (aliasList != null) {
+ aliasList.remove(index);
+ }
+
+ if (propsList != null) {
+ propsList.remove(index);
+ }
+
+ if (tagsList != null) {
+ tagsList.remove(index);
+ }
+
+ if (attributesList != null) {
+ attributesList.remove(index);
+ }
+ }
+
+ public boolean isEmpty() {
+ return measurements.isEmpty();
+ }
+
public void serialize(ByteBuffer byteBuffer) {
// measurements
ReadWriteIOUtils.write(measurements.size(), byteBuffer);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index a76aad7965..4a9461612b 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -24,21 +24,15 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
-import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
+import org.apache.iotdb.db.mpp.execution.executor.RegionExecutionResult;
+import org.apache.iotdb.db.mpp.execution.executor.RegionReadExecutor;
+import org.apache.iotdb.db.mpp.execution.executor.RegionWriteExecutor;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
@@ -177,7 +171,13 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
TSendPlanNodeResp sendPlanNodeResp =
client.sendPlanNode(sendPlanNodeReq);
if (!sendPlanNodeResp.accepted) {
logger.error(sendPlanNodeResp.getStatus().message);
- throw new
FragmentInstanceDispatchException(sendPlanNodeResp.getStatus());
+ if (sendPlanNodeResp.getStatus() == null) {
+ throw new FragmentInstanceDispatchException(
+ RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR,
sendPlanNodeResp.getMessage()));
+ } else {
+ throw new
FragmentInstanceDispatchException(sendPlanNodeResp.getStatus());
+ }
}
break;
default:
@@ -215,87 +215,27 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
switch (instance.getType()) {
case READ:
- // execute fragment instance in state machine
- ConsensusReadResponse readResponse;
- try {
- if (groupId instanceof DataRegionId) {
- readResponse = DataRegionConsensusImpl.getInstance().read(groupId,
instance);
- } else {
- readResponse =
SchemaRegionConsensusImpl.getInstance().read(groupId, instance);
- }
- if (readResponse == null) {
- logger.error("ReadResponse is null");
- throw new FragmentInstanceDispatchException(
- RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
"ReadResponse is null"));
- }
- } catch (Throwable t) {
- logger.error("Execute FragmentInstance in ConsensusGroup {}
failed.", groupId, t);
+ RegionReadExecutor readExecutor = new RegionReadExecutor();
+ RegionExecutionResult readResult = readExecutor.execute(groupId,
instance);
+ if (!readResult.isAccepted()) {
+ logger.error(readResult.getMessage());
throw new FragmentInstanceDispatchException(
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR,
- "Execute FragmentInstance failed. " + t.getMessage()));
- }
- if (!readResponse.isSuccess()) {
- logger.error(
- "dispatch FragmentInstance {} locally failed. ",
- instance,
- readResponse.getException());
- throw new FragmentInstanceDispatchException(
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR,
- "Execute FragmentInstance failed: "
- + (readResponse.getException() == null
- ? ""
- : readResponse.getException().getMessage())));
- } else {
- FragmentInstanceInfo info = (FragmentInstanceInfo)
readResponse.getDataset();
- if (info.getState().isFailed()) {
- throw new FragmentInstanceDispatchException(
- RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
info.getMessage()));
- }
+ RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
readResult.getMessage()));
}
break;
case WRITE:
PlanNode planNode = instance.getFragment().getPlanNodeTree();
- boolean hasFailedMeasurement = false;
- String partialInsertMessage = null;
- if (planNode instanceof InsertNode) {
- InsertNode insertNode = (InsertNode) planNode;
- try {
- SchemaValidator.validate(insertNode);
- } catch (SemanticException e) {
+ RegionWriteExecutor writeExecutor = new RegionWriteExecutor();
+ RegionExecutionResult writeResult = writeExecutor.execute(groupId,
planNode);
+ if (!writeResult.isAccepted()) {
+ logger.error(writeResult.getMessage());
+ if (writeResult.getStatus() == null) {
throw new FragmentInstanceDispatchException(
-
RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(),
e.getMessage()));
- }
- hasFailedMeasurement = insertNode.hasFailedMeasurements();
- if (hasFailedMeasurement) {
- partialInsertMessage =
- String.format(
- "Fail to insert measurements %s caused by %s",
- insertNode.getFailedMeasurements(),
insertNode.getFailedMessages());
- logger.warn(partialInsertMessage);
+ RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
writeResult.getMessage()));
+ } else {
+ throw new
FragmentInstanceDispatchException(writeResult.getStatus());
}
}
- ConsensusWriteResponse writeResponse;
- if (groupId instanceof DataRegionId) {
- writeResponse = DataRegionConsensusImpl.getInstance().write(groupId,
planNode);
- } else {
- writeResponse =
SchemaRegionConsensusImpl.getInstance().write(groupId, planNode);
- }
-
- if (!writeResponse.isSuccessful()) {
- logger.error(writeResponse.getErrorMessage());
- TSStatus failureStatus =
- writeResponse.getStatus() != null
- ? writeResponse.getStatus()
- : RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR,
writeResponse.getErrorMessage());
- throw new FragmentInstanceDispatchException(failureStatus);
- } else if (hasFailedMeasurement) {
- throw new FragmentInstanceDispatchException(
- RpcUtils.getStatus(
- TSStatusCode.METADATA_ERROR.getStatusCode(),
partialInsertMessage));
- }
break;
default:
throw new FragmentInstanceDispatchException(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 1fd5c68e1a..0b112bec46 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.db.service.basic.ServiceProvider;
import org.apache.iotdb.db.service.basic.StandaloneServiceProvider;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.DataNodeRegionManager;
import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.utils.WALMode;
@@ -304,6 +305,9 @@ public class DataNode implements DataNodeMBean {
}
}
+ // must init after SchemaEngine and StorageEngine prepared well
+ DataNodeRegionManager.getInstance().init();
+
registerManager.register(SyncService.getInstance());
registerManager.register(UpgradeSevice.getINSTANCE());
// in mpp mode we temporarily don't start settle service because it uses
StorageEngine directly
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index a3bb7f546d..33d8b9de14 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -42,7 +42,6 @@ import
org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.db.auth.AuthorizerManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
@@ -61,6 +60,9 @@ import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.TemplateInternalRPCUpdateType;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.executor.RegionExecutionResult;
+import org.apache.iotdb.db.mpp.execution.executor.RegionReadExecutor;
+import org.apache.iotdb.db.mpp.execution.executor.RegionWriteExecutor;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
@@ -158,11 +160,10 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
- private final DataNodeRegionManager regionManager;
+ private final DataNodeRegionManager regionManager =
DataNodeRegionManager.getInstance();
public DataNodeInternalRPCServiceImpl() {
super();
- regionManager = new DataNodeRegionManager(schemaEngine, storageEngine);
}
@Override
@@ -192,39 +193,13 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
return resp;
}
- // execute fragment instance in state machine
- ConsensusReadResponse readResponse;
- try (SetThreadName threadName = new
SetThreadName(fragmentInstance.getId().getFullId())) {
- if (groupId instanceof DataRegionId) {
- readResponse = DataRegionConsensusImpl.getInstance().read(groupId,
fragmentInstance);
- } else {
- readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId,
fragmentInstance);
- }
- TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
- if (!readResponse.isSuccess()) {
- LOGGER.error(
- "Execute FragmentInstance in ConsensusGroup {} failed.",
- req.getConsensusGroupId(),
- readResponse.getException());
- resp.setAccepted(false);
- resp.setMessage(
- "Execute FragmentInstance failed: "
- + (readResponse.getException() == null
- ? ""
- : readResponse.getException().getMessage()));
- } else {
- FragmentInstanceInfo info = (FragmentInstanceInfo)
readResponse.getDataset();
- resp.setAccepted(!info.getState().isFailed());
- resp.setMessage(info.getMessage());
- }
- return resp;
- } catch (Throwable t) {
- LOGGER.error(
- "Execute FragmentInstance in ConsensusGroup {} failed.",
req.getConsensusGroupId(), t);
- TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
- resp.setMessage("Execute FragmentInstance failed: " + t.getMessage());
- return resp;
- }
+ RegionReadExecutor executor = new RegionReadExecutor();
+ RegionExecutionResult executionResult = executor.execute(groupId,
fragmentInstance);
+ TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
+ resp.setAccepted(executionResult.isAccepted());
+ resp.setMessage(executionResult.getMessage());
+
+ return resp;
}
@Override
@@ -233,7 +208,13 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
PlanNode planNode = PlanNodeType.deserialize(req.planNode.body);
- return regionManager.executePlanNode(groupId, planNode);
+ RegionWriteExecutor executor = new RegionWriteExecutor();
+ TSendPlanNodeResp resp = new TSendPlanNodeResp();
+ RegionExecutionResult executionResult = executor.execute(groupId,
planNode);
+ resp.setAccepted(executionResult.isAccepted());
+ resp.setMessage(executionResult.getMessage());
+ resp.setStatus(executionResult.getStatus());
+ return resp;
}
@Override
@@ -355,10 +336,13 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
if (filteredPatternTree.isEmpty()) {
continue;
}
+ RegionWriteExecutor executor = new RegionWriteExecutor();
status =
- regionManager.executeSchemaPlanNode(
- new SchemaRegionId(consensusGroupId.getId()),
- new ConstructSchemaBlackListNode(new PlanNodeId(""),
filteredPatternTree));
+ executor
+ .execute(
+ new SchemaRegionId(consensusGroupId.getId()),
+ new ConstructSchemaBlackListNode(new PlanNodeId(""),
filteredPatternTree))
+ .getStatus();
if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
preDeletedNum += Integer.parseInt(status.getMessage());
} else {
@@ -388,10 +372,13 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
if (filteredPatternTree.isEmpty()) {
continue;
}
+ RegionWriteExecutor executor = new RegionWriteExecutor();
status =
- regionManager.executeSchemaPlanNode(
- new SchemaRegionId(consensusGroupId.getId()),
- new RollbackSchemaBlackListNode(new PlanNodeId(""),
filteredPatternTree));
+ executor
+ .execute(
+ new SchemaRegionId(consensusGroupId.getId()),
+ new RollbackSchemaBlackListNode(new PlanNodeId(""),
filteredPatternTree))
+ .getStatus();
if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failureList.add(status);
}
@@ -465,10 +452,13 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
List<TSStatus> failureList = new ArrayList<>();
TSStatus status;
for (TConsensusGroupId consensusGroupId : req.getDataRegionIdList()) {
+ RegionWriteExecutor executor = new RegionWriteExecutor();
status =
- regionManager.executeDeleteDataForDeleteTimeSeries(
- new DataRegionId(consensusGroupId.getId()),
- new DeleteDataNode(new PlanNodeId(""), pathList, Long.MIN_VALUE,
Long.MAX_VALUE));
+ executor
+ .execute(
+ new DataRegionId(consensusGroupId.getId()),
+ new DeleteDataNode(new PlanNodeId(""), pathList,
Long.MIN_VALUE, Long.MAX_VALUE))
+ .getStatus();
if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failureList.add(status);
}
@@ -496,10 +486,13 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
if (filteredPatternTree.isEmpty()) {
continue;
}
+ RegionWriteExecutor executor = new RegionWriteExecutor();
status =
- regionManager.executeSchemaPlanNode(
- new SchemaRegionId(consensusGroupId.getId()),
- new DeleteTimeSeriesNode(new PlanNodeId(""),
filteredPatternTree));
+ executor
+ .execute(
+ new SchemaRegionId(consensusGroupId.getId()),
+ new DeleteTimeSeriesNode(new PlanNodeId(""),
filteredPatternTree))
+ .getStatus();
if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failureList.add(status);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
index e82b25f1f1..3a9ecdbc0a 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
@@ -32,17 +32,11 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.exception.DataRegionException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -63,17 +57,25 @@ public class DataNodeRegionManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeRegionManager.class);
- private final SchemaEngine schemaEngine;
- private final StorageEngineV2 storageEngine;
+ private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
+ private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
private final Map<SchemaRegionId, ReentrantReadWriteLock>
schemaRegionLockMap =
new ConcurrentHashMap<>();
private final Map<DataRegionId, ReentrantReadWriteLock> dataRegionLockMap =
new ConcurrentHashMap<>();
- public DataNodeRegionManager(SchemaEngine schemaEngine, StorageEngineV2
storageEngine) {
- this.schemaEngine = schemaEngine;
- this.storageEngine = storageEngine;
+ private static class DataNodeRegionManagerHolder {
+ private static final DataNodeRegionManager INSTANCE = new
DataNodeRegionManager();
+
+ private DataNodeRegionManagerHolder() {}
+ }
+
+ public static DataNodeRegionManager getInstance() {
+ return DataNodeRegionManagerHolder.INSTANCE;
+ }
+
+ public void init() {
schemaEngine
.getAllSchemaRegions()
.forEach(
@@ -88,125 +90,17 @@ public class DataNodeRegionManager {
dataRegionId -> dataRegionLockMap.put(dataRegionId, new
ReentrantReadWriteLock(false)));
}
- public TSendPlanNodeResp executePlanNode(ConsensusGroupId groupId, PlanNode
planNode) {
- if (planNode instanceof InsertNode) {
- return executeDataInsert((DataRegionId) groupId, (InsertNode) planNode);
- } else {
- TSendPlanNodeResp response = new TSendPlanNodeResp();
- ConsensusWriteResponse writeResponse =
executePlanNodeInConsensusLayer(groupId, planNode);
- // TODO need consider more status
- if (writeResponse.getStatus() != null) {
- response.setAccepted(
- TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
writeResponse.getStatus().getCode());
- response.setMessage(writeResponse.getStatus().message);
- response.setStatus(writeResponse.getStatus());
- } else {
- LOGGER.error(
- "Something wrong happened while calling consensus layer's write
API.",
- writeResponse.getException());
- response.setAccepted(false);
- response.setMessage(writeResponse.getException().getMessage());
- }
- return response;
- }
+ public void clear() {
+ schemaRegionLockMap.clear();
+ dataRegionLockMap.clear();
}
- private ConsensusWriteResponse executePlanNodeInConsensusLayer(
- ConsensusGroupId groupId, PlanNode planNode) {
- if (groupId instanceof DataRegionId) {
- return DataRegionConsensusImpl.getInstance().write(groupId, planNode);
- } else {
- return SchemaRegionConsensusImpl.getInstance().write(groupId, planNode);
- }
- }
-
- private TSendPlanNodeResp executeDataInsert(DataRegionId dataRegionId,
InsertNode insertNode) {
- TSendPlanNodeResp response = new TSendPlanNodeResp();
- dataRegionLockMap.get(dataRegionId).readLock().lock();
- try {
- try {
- SchemaValidator.validate(insertNode);
- } catch (SemanticException e) {
- response.setAccepted(false);
- response.setStatus(
- RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(),
e.getMessage()));
- response.setMessage(e.getMessage());
- return response;
- }
- boolean hasFailedMeasurement = insertNode.hasFailedMeasurements();
- String partialInsertMessage = null;
- if (hasFailedMeasurement) {
- partialInsertMessage =
- String.format(
- "Fail to insert measurements %s caused by %s",
- insertNode.getFailedMeasurements(),
insertNode.getFailedMessages());
- LOGGER.warn(partialInsertMessage);
- }
-
- ConsensusWriteResponse writeResponse =
- executePlanNodeInConsensusLayer(dataRegionId, insertNode);
+ private DataNodeRegionManager() {}
- // TODO need consider more status
- if (writeResponse.getStatus() != null) {
- response.setAccepted(
- !hasFailedMeasurement
- && TSStatusCode.SUCCESS_STATUS.getStatusCode()
- == writeResponse.getStatus().getCode());
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
writeResponse.getStatus().getCode()) {
- response.setMessage(writeResponse.getStatus().message);
- response.setStatus(writeResponse.getStatus());
- } else if (hasFailedMeasurement) {
- response.setMessage(partialInsertMessage);
- response.setStatus(
- RpcUtils.getStatus(
- TSStatusCode.METADATA_ERROR.getStatusCode(),
partialInsertMessage));
- } else {
- response.setMessage(writeResponse.getStatus().message);
- }
- } else {
- LOGGER.error(
- "Something wrong happened while calling consensus layer's write
API.",
- writeResponse.getException());
- response.setAccepted(false);
- response.setMessage(writeResponse.getException().getMessage());
- }
-
- return response;
- } finally {
- dataRegionLockMap.get(dataRegionId).readLock().unlock();
- }
- }
-
- public TSStatus executeSchemaPlanNode(SchemaRegionId schemaRegionId,
PlanNode planNode) {
- ConsensusWriteResponse writeResponse =
- executePlanNodeInConsensusLayer(schemaRegionId, planNode);
- TSStatus status = writeResponse.getStatus();
- if (status == null) {
- LOGGER.error(
- "Something wrong happened while calling consensus layer's write
API.",
- writeResponse.getException());
- return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
- }
- return status;
- }
-
- public TSStatus executeDeleteDataForDeleteTimeSeries(
- DataRegionId dataRegionId, PlanNode planNode) {
- dataRegionLockMap.get(dataRegionId).writeLock().lock();
- try {
- ConsensusWriteResponse writeResponse =
- executePlanNodeInConsensusLayer(dataRegionId, planNode);
- TSStatus status = writeResponse.getStatus();
- if (status == null) {
- LOGGER.error(
- "Something wrong happened while calling consensus layer's write
API.",
- writeResponse.getException());
- return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
- }
- return status;
- } finally {
- dataRegionLockMap.get(dataRegionId).writeLock().unlock();
- }
+ public ReentrantReadWriteLock getRegionLock(ConsensusGroupId
consensusGroupId) {
+ return consensusGroupId instanceof DataRegionId
+ ? dataRegionLockMap.get((DataRegionId) consensusGroupId)
+ : schemaRegionLockMap.get((SchemaRegionId) consensusGroupId);
}
public TSStatus createSchemaRegion(TRegionReplicaSet regionReplicaSet,
String storageGroup) {
diff --git
a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
index edc327ad7c..e3008808d0 100644
---
a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlign
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.service.thrift.impl.DataNodeInternalRPCServiceImpl;
+import org.apache.iotdb.db.service.thrift.impl.DataNodeRegionManager;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
@@ -76,6 +77,7 @@ public class DataNodeInternalRPCServiceImplTest {
configNode.getBelongedSchemaRegionIdWithAutoCreate(new
PartialPath("root.ln"));
DataRegionConsensusImpl.setupAndGetInstance().start();
SchemaRegionConsensusImpl.setupAndGetInstance().start();
+ DataNodeRegionManager.getInstance().init();
}
@Before
@@ -99,6 +101,7 @@ public class DataNodeInternalRPCServiceImplTest {
@AfterClass
public static void tearDownAfterClass() throws IOException,
StorageEngineException {
+ DataNodeRegionManager.getInstance().clear();
DataRegionConsensusImpl.getInstance().stop();
SchemaRegionConsensusImpl.getInstance().stop();
IoTDB.configManager.clear();