This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3d7413ce42f Report error message when deleting template time series
with multi-level wildcard pattern
3d7413ce42f is described below
commit 3d7413ce42fa898db3ef14a32e0f93c0659c13d6
Author: Chen YZ <[email protected]>
AuthorDate: Fri Apr 19 16:28:02 2024 +0800
Report error message when deleting template time series with multi-level
wildcard pattern
---
.../db/it/schema/IoTDBDeleteTimeSeriesIT.java | 34 ++++
.../confignode/client/DataNodeRequestType.java | 1 +
.../client/async/AsyncDataNodeClientPool.java | 8 +
.../client/async/handlers/AsyncClientHandler.java | 10 +
.../CheckSchemaRegionUsingTemplateRPCHandler.java | 93 +++++++++
.../iotdb/confignode/manager/ConfigManager.java | 10 +
.../impl/schema/DataNodeRegionTaskExecutor.java | 24 ++-
.../procedure/impl/schema/SchemaUtils.java | 211 +++++++++++++++++++++
.../impl/schema/UnsetTemplateProcedure.java | 94 ++-------
.../impl/DataNodeInternalRPCServiceImpl.java | 33 +++-
.../src/main/thrift/datanode.thrift | 11 ++
11 files changed, 439 insertions(+), 90 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeleteTimeSeriesIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeleteTimeSeriesIT.java
index 743ff74dfc3..b737dccf71f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeleteTimeSeriesIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeleteTimeSeriesIT.java
@@ -404,12 +404,44 @@ public class IoTDBDeleteTimeSeriesIT extends
AbstractSchemaIT {
}
}
+ @Test
+ public void deleteTemplateTimeSeriesTest() throws Exception {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("CREATE DATABASE root.db");
+ statement.execute("CREATE DEVICE TEMPLATE t1 (s1 INT64, s2 DOUBLE)");
+ statement.execute("SET DEVICE TEMPLATE t1 to root.db");
+ statement.execute("CREATE TIMESERIES USING DEVICE TEMPLATE ON
root.db.d1");
+ try {
+ statement.execute("DELETE TIMESERIES root.**");
+ Assert.fail();
+ } catch (SQLException e) {
+ Assert.assertTrue(
+ e.getMessage()
+ .contains(
+ TSStatusCode.PATH_NOT_EXIST.getStatusCode()
+ + ": Timeseries [root.**] does not exist or is
represented by device template"));
+ }
+ try {
+ statement.execute("DELETE TIMESERIES root.db.**");
+ Assert.fail();
+ } catch (SQLException e) {
+ Assert.assertTrue(
+ e.getMessage()
+ .contains(
+ TSStatusCode.PATH_NOT_EXIST.getStatusCode()
+ + ": Timeseries [root.db.**] does not exist or is
represented by device template"));
+ }
+ }
+ }
+
@Test
public void deleteTimeSeriesAndReturnPathNotExistsTest() throws Exception {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
try {
statement.execute("delete timeseries root.**");
+ Assert.fail();
} catch (SQLException e) {
Assert.assertTrue(
e.getMessage()
@@ -428,6 +460,7 @@ public class IoTDBDeleteTimeSeriesIT extends
AbstractSchemaIT {
}
int cnt = 0;
+
try (ResultSet resultSet = statement.executeQuery("select count(s1) from
root.*.d1")) {
while (resultSet.next()) {
StringBuilder ans = new
StringBuilder(resultSet.getString(TIMESTAMP_STR));
@@ -442,6 +475,7 @@ public class IoTDBDeleteTimeSeriesIT extends
AbstractSchemaIT {
try {
statement.execute("delete timeseries root.*.d1.s3");
+ Assert.fail();
} catch (SQLException e) {
Assert.assertTrue(
e.getMessage()
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 1c7bfa27530..381ae40be92 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -101,6 +101,7 @@ public enum DataNodeRequestType {
ROLLBACK_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
DEACTIVATE_TEMPLATE,
COUNT_PATHS_USING_TEMPLATE,
+ CHECK_SCHEMA_REGION_USING_TEMPLATE,
CHECK_TIMESERIES_EXISTENCE,
CONSTRUCT_VIEW_SCHEMA_BLACK_LIST,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index a697a69b2d9..dd201a66b57 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -40,10 +40,12 @@ import
org.apache.iotdb.confignode.client.async.handlers.rpc.PipeHeartbeatRPCHan
import
org.apache.iotdb.confignode.client.async.handlers.rpc.PipePushMetaRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.SchemaUpdateRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.TransferLeaderRPCHandler;
+import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterViewReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceReq;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
import
org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListWithTemplateReq;
@@ -438,6 +440,12 @@ public class AsyncDataNodeClientPool {
(CountPathsUsingTemplateRPCHandler)
clientHandler.createAsyncRPCHandler(requestId,
targetDataNode));
break;
+ case CHECK_SCHEMA_REGION_USING_TEMPLATE:
+ client.checkSchemaRegionUsingTemplate(
+ (TCheckSchemaRegionUsingTemplateReq)
clientHandler.getRequest(requestId),
+ (CheckSchemaRegionUsingTemplateRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId,
targetDataNode));
+ break;
case CHECK_TIMESERIES_EXISTENCE:
client.checkTimeSeriesExistence(
(TCheckTimeSeriesExistenceReq)
clientHandler.getRequest(requestId),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index 7cd86af6ef7..d4b0e7f52b7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -31,8 +31,10 @@ import
org.apache.iotdb.confignode.client.async.handlers.rpc.PipeHeartbeatRPCHan
import
org.apache.iotdb.confignode.client.async.handlers.rpc.PipePushMetaRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.SchemaUpdateRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.TransferLeaderRPCHandler;
+import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
+import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
@@ -200,6 +202,14 @@ public class AsyncClientHandler<Q, R> {
dataNodeLocationMap,
(Map<Integer, TCountPathsUsingTemplateResp>) responseMap,
countDownLatch);
+ case CHECK_SCHEMA_REGION_USING_TEMPLATE:
+ return new CheckSchemaRegionUsingTemplateRPCHandler(
+ requestType,
+ requestId,
+ targetDataNode,
+ dataNodeLocationMap,
+ (Map<Integer, TCheckSchemaRegionUsingTemplateResp>) responseMap,
+ countDownLatch);
case CHECK_TIMESERIES_EXISTENCE:
return new CheckTimeSeriesExistenceRPCHandler(
requestType,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
new file mode 100644
index 00000000000..ad695162992
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.async.handlers.rpc.subscription;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import
org.apache.iotdb.confignode.client.async.handlers.rpc.AbstractAsyncRPCHandler;
+import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateResp;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class CheckSchemaRegionUsingTemplateRPCHandler
+ extends AbstractAsyncRPCHandler<TCheckSchemaRegionUsingTemplateResp> {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(CheckSchemaRegionUsingTemplateRPCHandler.class);
+
+ public CheckSchemaRegionUsingTemplateRPCHandler(
+ DataNodeRequestType requestType,
+ int requestId,
+ TDataNodeLocation targetDataNode,
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+ Map<Integer, TCheckSchemaRegionUsingTemplateResp> responseMap,
+ CountDownLatch countDownLatch) {
+ super(requestType, requestId, targetDataNode, dataNodeLocationMap,
responseMap, countDownLatch);
+ }
+
+ @Override
+ public void onComplete(TCheckSchemaRegionUsingTemplateResp response) {
+ TSStatus tsStatus = response.getStatus();
+ responseMap.put(requestId, response);
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataNodeLocationMap.remove(requestId);
+ LOGGER.info(
+ "Successfully check schema region using template on DataNode: {}",
targetDataNode);
+ } else if (tsStatus.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ dataNodeLocationMap.remove(requestId);
+ LOGGER.error(
+ "Failed to check schema region using template on DataNode {}, {}",
+ targetDataNode,
+ tsStatus);
+ } else {
+ LOGGER.error(
+ "Failed to check schema region using template on DataNode {}, {}",
+ targetDataNode,
+ tsStatus);
+ }
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onError(Exception e) {
+ String errorMsg =
+ "Count paths using template error on DataNode: {id="
+ + targetDataNode.getDataNodeId()
+ + ", internalEndPoint="
+ + targetDataNode.getInternalEndPoint()
+ + "}"
+ + e.getMessage();
+ LOGGER.error(errorMsg);
+
+ countDownLatch.countDown();
+ TCheckSchemaRegionUsingTemplateResp resp = new
TCheckSchemaRegionUsingTemplateResp();
+ resp.setStatus(
+ new TSStatus(
+
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
errorMsg)));
+ responseMap.put(requestId, resp);
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index ac6e7e6ec93..520e4dd112b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.path.PathPatternUtil;
@@ -109,6 +110,7 @@ import
org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
+import org.apache.iotdb.confignode.procedure.impl.schema.SchemaUtils;
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
@@ -1814,6 +1816,7 @@ public class ConfigManager implements IManager {
boolean canOptimize = false;
HashSet<TDatabaseSchema> deleteDatabaseSchemas = new HashSet<>();
List<PartialPath> deleteTimeSeriesPatternPaths = new ArrayList<>();
+ List<PartialPath> deleteDatabasePatternPaths = new ArrayList<>();
for (PartialPath path : rawPatternTree.getAllPathPatterns()) {
if (PathPatternUtil.isMultiLevelMatchWildcard(path.getMeasurement())
&& !path.getDevicePath().hasWildcard()) {
@@ -1821,6 +1824,7 @@ public class ConfigManager implements IManager {
getClusterSchemaManager().getMatchedDatabaseSchemasByPrefix(path.getDevicePath());
if (!databaseSchemaMap.isEmpty()) {
deleteDatabaseSchemas.addAll(databaseSchemaMap.values());
+ deleteDatabasePatternPaths.add(path);
canOptimize = true;
continue;
}
@@ -1830,6 +1834,12 @@ public class ConfigManager implements IManager {
if (!canOptimize) {
return procedureManager.deleteTimeSeries(queryId, rawPatternTree,
isGeneratedByPipe);
}
+ // check if the database is using template
+ try {
+ SchemaUtils.checkSchemaRegionUsingTemplate(this,
deleteDatabasePatternPaths);
+ } catch (MetadataException e) {
+ return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ }
if (!deleteTimeSeriesPatternPaths.isEmpty()) {
// 1. delete time series that can not be optimized
PathPatternTree deleteTimeSeriesPatternTree = new PathPatternTree();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java
index b360d4454af..f5071e95327 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import java.util.ArrayList;
@@ -40,7 +41,7 @@ import static
org.apache.iotdb.confignode.procedure.impl.schema.DataNodeRegionGr
public abstract class DataNodeRegionTaskExecutor<Q, R> {
- protected final ConfigNodeProcedureEnv env;
+ protected final ConfigManager configManager;
protected final Map<TConsensusGroupId, TRegionReplicaSet>
targetSchemaRegionGroup;
protected final boolean executeOnAllReplicaset;
@@ -50,13 +51,26 @@ public abstract class DataNodeRegionTaskExecutor<Q, R> {
private boolean isInterrupted = false;
+ protected DataNodeRegionTaskExecutor(
+ ConfigManager configManager,
+ Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup,
+ boolean executeOnAllReplicaset,
+ DataNodeRequestType dataNodeRequestType,
+ BiFunction<TDataNodeLocation, List<TConsensusGroupId>, Q>
dataNodeRequestGenerator) {
+ this.configManager = configManager;
+ this.targetSchemaRegionGroup = targetSchemaRegionGroup;
+ this.executeOnAllReplicaset = executeOnAllReplicaset;
+ this.dataNodeRequestType = dataNodeRequestType;
+ this.dataNodeRequestGenerator = dataNodeRequestGenerator;
+ }
+
protected DataNodeRegionTaskExecutor(
ConfigNodeProcedureEnv env,
Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup,
boolean executeOnAllReplicaset,
DataNodeRequestType dataNodeRequestType,
BiFunction<TDataNodeLocation, List<TConsensusGroupId>, Q>
dataNodeRequestGenerator) {
- this.env = env;
+ this.configManager = env.getConfigManager();
this.targetSchemaRegionGroup = targetSchemaRegionGroup;
this.executeOnAllReplicaset = executeOnAllReplicaset;
this.dataNodeRequestType = dataNodeRequestType;
@@ -70,8 +84,7 @@ public abstract class DataNodeRegionTaskExecutor<Q, R> {
executeOnAllReplicaset
? getAllReplicaDataNodeRegionGroupMap(targetSchemaRegionGroup)
: getLeaderDataNodeRegionGroupMap(
- env.getConfigManager().getLoadManager().getRegionLeaderMap(),
- targetSchemaRegionGroup);
+ configManager.getLoadManager().getRegionLeaderMap(),
targetSchemaRegionGroup);
while (!dataNodeConsensusGroupIdMap.isEmpty()) {
AsyncClientHandler<Q, R> clientHandler =
prepareRequestHandler(dataNodeConsensusGroupIdMap);
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
@@ -142,8 +155,7 @@ public abstract class DataNodeRegionTaskExecutor<Q, R> {
Map<TDataNodeLocation, List<TConsensusGroupId>> availableDataNodeLocation
= new HashMap<>();
- Map<TConsensusGroupId, Integer> leaderMap =
- env.getConfigManager().getLoadManager().getRegionLeaderMap();
+ Map<TConsensusGroupId, Integer> leaderMap =
configManager.getLoadManager().getRegionLeaderMap();
for (List<TConsensusGroupId> consensusGroupIdList :
failedDataNodeConsensusGroupIdMap.values()) {
for (TConsensusGroupId consensusGroupId : consensusGroupIdList) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SchemaUtils.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SchemaUtils.java
new file mode 100644
index 00000000000..1dc978cc404
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SchemaUtils.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.schema;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.schemaengine.template.Template;
+import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateResp;
+import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class SchemaUtils {
+ /**
+ * Check whether the specific template is activated on the given pattern
tree.
+ *
+ * @return true if the template is activated on the given pattern tree,
false otherwise.
+ * @throws MetadataException if any error occurs when checking the
activation.
+ */
+ public static boolean checkDataNodeTemplateActivation(
+ ConfigManager configManager, PathPatternTree patternTree, Template
template)
+ throws MetadataException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
+ try {
+ patternTree.serialize(dataOutputStream);
+ } catch (IOException ignored) {
+ }
+ ByteBuffer patternTreeBytes =
ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+
+ Map<TConsensusGroupId, TRegionReplicaSet> relatedSchemaRegionGroup =
+ configManager.getRelatedSchemaRegionGroup(patternTree);
+
+ List<TCountPathsUsingTemplateResp> respList = new ArrayList<>();
+ final MetadataException[] exception = {null};
+ DataNodeRegionTaskExecutor<TCountPathsUsingTemplateReq,
TCountPathsUsingTemplateResp>
+ regionTask =
+ new DataNodeRegionTaskExecutor<
+ TCountPathsUsingTemplateReq, TCountPathsUsingTemplateResp>(
+ configManager,
+ relatedSchemaRegionGroup,
+ false,
+ DataNodeRequestType.COUNT_PATHS_USING_TEMPLATE,
+ ((dataNodeLocation, consensusGroupIdList) ->
+ new TCountPathsUsingTemplateReq(
+ template.getId(), patternTreeBytes,
consensusGroupIdList))) {
+
+ @Override
+ protected List<TConsensusGroupId> processResponseOfOneDataNode(
+ TDataNodeLocation dataNodeLocation,
+ List<TConsensusGroupId> consensusGroupIdList,
+ TCountPathsUsingTemplateResp response) {
+ respList.add(response);
+ List<TConsensusGroupId> failedRegionList = new ArrayList<>();
+ if (response.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return failedRegionList;
+ }
+
+ if (response.getStatus().getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ List<TSStatus> subStatus =
response.getStatus().getSubStatus();
+ for (int i = 0; i < subStatus.size(); i++) {
+ if (subStatus.get(i).getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failedRegionList.add(consensusGroupIdList.get(i));
+ }
+ }
+ } else {
+ failedRegionList.addAll(consensusGroupIdList);
+ }
+ return failedRegionList;
+ }
+
+ @Override
+ protected void onAllReplicasetFailure(
+ TConsensusGroupId consensusGroupId, Set<TDataNodeLocation>
dataNodeLocationSet) {
+ exception[0] =
+ new MetadataException(
+ String.format(
+ "all replicaset of schemaRegion %s failed. %s",
+ consensusGroupId.id, dataNodeLocationSet));
+ interruptTask();
+ }
+ };
+ regionTask.execute();
+ if (exception[0] != null) {
+ throw exception[0];
+ }
+ for (TCountPathsUsingTemplateResp resp : respList) {
+ if (resp.count > 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Check whether any template is activated on the given schema regions.
+ *
+ * @return true if the template is activated on the given pattern tree,
false otherwise.
+ * @throws MetadataException if any error occurs when checking the
activation.
+ */
+ public static void checkSchemaRegionUsingTemplate(
+ ConfigManager configManager, List<PartialPath>
deleteDatabasePatternPaths)
+ throws MetadataException {
+
+ PathPatternTree deleteDatabasePatternTree = new PathPatternTree();
+ for (PartialPath path : deleteDatabasePatternPaths) {
+ deleteDatabasePatternTree.appendPathPattern(path);
+ }
+ deleteDatabasePatternTree.constructTree();
+ Map<TConsensusGroupId, TRegionReplicaSet> relatedSchemaRegionGroup =
+ configManager.getRelatedSchemaRegionGroup(deleteDatabasePatternTree);
+ List<TCheckSchemaRegionUsingTemplateResp> respList = new ArrayList<>();
+ final MetadataException[] exception = {null};
+ DataNodeRegionTaskExecutor<
+ TCheckSchemaRegionUsingTemplateReq,
TCheckSchemaRegionUsingTemplateResp>
+ regionTask =
+ new DataNodeRegionTaskExecutor<
+ TCheckSchemaRegionUsingTemplateReq,
TCheckSchemaRegionUsingTemplateResp>(
+ configManager,
+ relatedSchemaRegionGroup,
+ false,
+ DataNodeRequestType.CHECK_SCHEMA_REGION_USING_TEMPLATE,
+ ((dataNodeLocation, consensusGroupIdList) ->
+ new
TCheckSchemaRegionUsingTemplateReq(consensusGroupIdList))) {
+
+ @Override
+ protected List<TConsensusGroupId> processResponseOfOneDataNode(
+ TDataNodeLocation dataNodeLocation,
+ List<TConsensusGroupId> consensusGroupIdList,
+ TCheckSchemaRegionUsingTemplateResp response) {
+ respList.add(response);
+ List<TConsensusGroupId> failedRegionList = new ArrayList<>();
+ if (response.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return failedRegionList;
+ }
+
+ if (response.getStatus().getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ List<TSStatus> subStatus =
response.getStatus().getSubStatus();
+ for (int i = 0; i < subStatus.size(); i++) {
+ if (subStatus.get(i).getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failedRegionList.add(consensusGroupIdList.get(i));
+ }
+ }
+ } else {
+ failedRegionList.addAll(consensusGroupIdList);
+ }
+ return failedRegionList;
+ }
+
+ @Override
+ protected void onAllReplicasetFailure(
+ TConsensusGroupId consensusGroupId, Set<TDataNodeLocation>
dataNodeLocationSet) {
+ exception[0] =
+ new MetadataException(
+ String.format(
+ "all replicaset of schemaRegion %s failed. %s",
+ consensusGroupId.id, dataNodeLocationSet));
+ interruptTask();
+ }
+ };
+ regionTask.execute();
+ if (exception[0] != null) {
+ throw exception[0];
+ }
+ for (TCheckSchemaRegionUsingTemplateResp resp : respList) {
+ if (resp.result) {
+ throw new PathNotExistException(
+ deleteDatabasePatternPaths.stream()
+ .map(PartialPath::getFullPath)
+ .collect(Collectors.toList()),
+ false);
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/UnsetTemplateProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/UnsetTemplateProcedure.java
index bbe97b0ada4..498863b15c3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/UnsetTemplateProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/UnsetTemplateProcedure.java
@@ -19,9 +19,7 @@
package org.apache.iotdb.confignode.procedure.impl.schema;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
@@ -42,8 +40,6 @@ import
org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUpdateType;
import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUtil;
-import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -51,15 +47,11 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
@@ -114,7 +106,7 @@ public class UnsetTemplateProcedure
if (isFailed()) {
return Flow.NO_MORE_STATE;
}
- if (checkDataNodeTemplateActivation(env) > 0) {
+ if (checkDataNodeTemplateActivation(env)) {
setFailure(new ProcedureException(new
TemplateIsInUseException(path.getFullPath())));
return Flow.NO_MORE_STATE;
} else {
@@ -180,84 +172,22 @@ public class UnsetTemplateProcedure
}
}
- private long checkDataNodeTemplateActivation(ConfigNodeProcedureEnv env) {
+ private boolean checkDataNodeTemplateActivation(ConfigNodeProcedureEnv env) {
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(path);
patternTree.appendPathPattern(path.concatNode(MULTI_LEVEL_PATH_WILDCARD));
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
try {
- patternTree.serialize(dataOutputStream);
- } catch (IOException ignored) {
- }
- ByteBuffer patternTreeBytes =
ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
-
- Map<TConsensusGroupId, TRegionReplicaSet> relatedSchemaRegionGroup =
- env.getConfigManager().getRelatedSchemaRegionGroup(patternTree);
-
- List<TCountPathsUsingTemplateResp> respList = new ArrayList<>();
- DataNodeRegionTaskExecutor<TCountPathsUsingTemplateReq,
TCountPathsUsingTemplateResp>
- regionTask =
- new DataNodeRegionTaskExecutor<
- TCountPathsUsingTemplateReq, TCountPathsUsingTemplateResp>(
- env,
- relatedSchemaRegionGroup,
- false,
- DataNodeRequestType.COUNT_PATHS_USING_TEMPLATE,
- ((dataNodeLocation, consensusGroupIdList) ->
- new TCountPathsUsingTemplateReq(
- template.getId(), patternTreeBytes,
consensusGroupIdList))) {
-
- @Override
- protected List<TConsensusGroupId> processResponseOfOneDataNode(
- TDataNodeLocation dataNodeLocation,
- List<TConsensusGroupId> consensusGroupIdList,
- TCountPathsUsingTemplateResp response) {
- respList.add(response);
- List<TConsensusGroupId> failedRegionList = new ArrayList<>();
- if (response.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return failedRegionList;
- }
-
- if (response.getStatus().getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- List<TSStatus> subStatus =
response.getStatus().getSubStatus();
- for (int i = 0; i < subStatus.size(); i++) {
- if (subStatus.get(i).getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- failedRegionList.add(consensusGroupIdList.get(i));
- }
- }
- } else {
- failedRegionList.addAll(consensusGroupIdList);
- }
- return failedRegionList;
- }
-
- @Override
- protected void onAllReplicasetFailure(
- TConsensusGroupId consensusGroupId, Set<TDataNodeLocation>
dataNodeLocationSet) {
- setFailure(
- new ProcedureException(
- new MetadataException(
- String.format(
- "Unset template %s from %s failed when [check
DataNode template activation] because all replicaset of schemaRegion %s failed.
%s",
- template.getName(),
- path,
- consensusGroupId.id,
- dataNodeLocationSet))));
- interruptTask();
- }
- };
- regionTask.execute();
- if (isFailed()) {
- return 0;
- }
-
- long result = 0;
- for (TCountPathsUsingTemplateResp resp : respList) {
- result += resp.getCount();
+ return SchemaUtils.checkDataNodeTemplateActivation(
+ env.getConfigManager(), patternTree, template);
+ } catch (MetadataException e) {
+ setFailure(
+ new ProcedureException(
+ new MetadataException(
+ String.format(
+ "Unset template %s from %s failed when [check DataNode
template activation] because %s",
+ template.getName(), path, e.getMessage()))));
+ return false;
}
-
- return result;
}
private void unsetTemplate(ConfigNodeProcedureEnv env) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index db3c2030c43..120637847be 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -147,6 +147,8 @@ import
org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelResp;
+import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
@@ -251,6 +253,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
@@ -775,8 +778,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
@Override
- public TCountPathsUsingTemplateResp
countPathsUsingTemplate(TCountPathsUsingTemplateReq req)
- throws TException {
+ public TCountPathsUsingTemplateResp
countPathsUsingTemplate(TCountPathsUsingTemplateReq req) {
PathPatternTree patternTree = PathPatternTree.deserialize(req.patternTree);
TCountPathsUsingTemplateResp resp = new TCountPathsUsingTemplateResp();
AtomicLong result = new AtomicLong(0);
@@ -810,6 +812,33 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
return resp;
}
+ @Override
+ public TCheckSchemaRegionUsingTemplateResp checkSchemaRegionUsingTemplate(
+ TCheckSchemaRegionUsingTemplateReq req) {
+ TCheckSchemaRegionUsingTemplateResp resp = new
TCheckSchemaRegionUsingTemplateResp();
+ AtomicBoolean result = new AtomicBoolean(false);
+ resp.setStatus(
+ executeInternalSchemaTask(
+ req.getSchemaRegionIdList(),
+ consensusGroupId -> {
+ ReadWriteLock readWriteLock =
+ regionManager.getRegionLock(new
SchemaRegionId(consensusGroupId.getId()));
+ readWriteLock.writeLock().lock();
+ try {
+ ISchemaRegion schemaRegion =
+ schemaEngine.getSchemaRegion(new
SchemaRegionId(consensusGroupId.getId()));
+ if
(schemaRegion.getSchemaRegionStatistics().getTemplateActivatedNumber() > 0) {
+ result.set(true);
+ }
+ return RpcUtils.SUCCESS_STATUS;
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }));
+ resp.setResult(result.get());
+ return resp;
+ }
+
@Override
public TCheckTimeSeriesExistenceResp
checkTimeSeriesExistence(TCheckTimeSeriesExistenceReq req) {
PathPatternTree patternTree = PathPatternTree.deserialize(req.patternTree);
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 53c42c1e26a..f5d6839ce6c 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -413,6 +413,15 @@ struct TCountPathsUsingTemplateResp {
2: optional i64 count
}
+struct TCheckSchemaRegionUsingTemplateReq{
+ 1: required list<common.TConsensusGroupId> schemaRegionIdList
+}
+
+struct TCheckSchemaRegionUsingTemplateResp {
+ 1: required common.TSStatus status
+ 2: required bool result
+}
+
struct TCheckTimeSeriesExistenceReq {
1: required binary patternTree
2: required list<common.TConsensusGroupId> schemaRegionIdList
@@ -906,6 +915,8 @@ service IDataNodeRPCService {
TCountPathsUsingTemplateResp
countPathsUsingTemplate(TCountPathsUsingTemplateReq req)
+ TCheckSchemaRegionUsingTemplateResp
checkSchemaRegionUsingTemplate(TCheckSchemaRegionUsingTemplateReq req)
+
TCheckTimeSeriesExistenceResp
checkTimeSeriesExistence(TCheckTimeSeriesExistenceReq req)
common.TSStatus
constructViewSchemaBlackList(TConstructViewSchemaBlackListReq req)