This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5aa6ac53da6 Fix bug: parent procedure should not execute before
sub-procedure finished (#12134)
5aa6ac53da6 is described below
commit 5aa6ac53da6c19356e23dd06141f2f59393a9685
Author: Li Yu Heng <[email protected]>
AuthorDate: Mon Mar 11 14:20:18 2024 +0800
Fix bug: parent procedure should not execute before sub-procedure finished
(#12134)
---
.../confignode/it/procedure/IoTDBProcedureIT.java | 113 ++++++++++++++++++---
.../iotdb/confignode/manager/ConfigManager.java | 7 --
.../apache/iotdb/confignode/manager/IManager.java | 9 --
.../iotdb/confignode/manager/ProcedureManager.java | 9 +-
.../confignode/procedure/ProcedureExecutor.java | 11 +-
.../AddNeverFinishSubProcedureProcedure.java | 78 ++++++++++++++
.../CreateManyDatabasesProcedure.java | 12 +--
.../impl/testonly/NeverFinishProcedure.java | 68 +++++++++++++
.../impl/testonly/ProcedureTestUtils.java} | 26 ++---
.../procedure/store/ProcedureFactory.java | 14 ++-
.../confignode/procedure/store/ProcedureType.java | 10 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 14 ++-
.../iotdb/db/protocol/client/ConfigNodeClient.java | 5 +-
.../org/apache/iotdb/commons/utils/TestOnly.java | 2 +-
.../src/main/thrift/confignode.thrift | 9 +-
15 files changed, 323 insertions(+), 64 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
index 4d0f388a33b..37cb4efd162 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
@@ -22,9 +22,11 @@ package org.apache.iotdb.confignode.it.procedure;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.SchemaConstant;
-import org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.testonly.AddNeverFinishSubProcedureProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.testonly.CreateManyDatabasesProcedure;
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTestOperation;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement;
import org.apache.iotdb.db.utils.constant.SqlConstant;
import org.apache.iotdb.it.env.EnvFactory;
@@ -33,6 +35,7 @@ import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -42,13 +45,15 @@ import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-import static
org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure.MAX_STATE;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.iotdb.confignode.procedure.impl.testonly.CreateManyDatabasesProcedure.MAX_STATE;
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
@RunWith(IoTDBTestRunner.class)
@@ -56,6 +61,22 @@ import static
org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
public class IoTDBProcedureIT {
private static Logger LOGGER =
LoggerFactory.getLogger(IoTDBProcedureIT.class);
+ private static final TGetDatabaseReq showAllDatabasesReq;
+
+ static {
+ try {
+ showAllDatabasesReq =
+ new TGetDatabaseReq(
+ Arrays.asList(
+ new ShowDatabaseStatement(new
PartialPath(SqlConstant.getSingleRootArray()))
+ .getPathPattern()
+ .getNodes()),
+ SchemaConstant.ALL_MATCH_SCOPE.serialize());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Before
public void setUp() {
EnvFactory.getEnv()
@@ -98,19 +119,10 @@ public class IoTDBProcedureIT {
SyncConfigNodeIServiceClient leaderClient =
(SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection();
- leaderClient.createManyDatabases();
-
- // prepare req
- final TGetDatabaseReq req =
- new TGetDatabaseReq(
- Arrays.asList(
- new ShowDatabaseStatement(new
PartialPath(SqlConstant.getSingleRootArray()))
- .getPathPattern()
- .getNodes()),
- SchemaConstant.ALL_MATCH_SCOPE.serialize());
+ leaderClient.callSpecialProcedure(TTestOperation.TEST_PROCEDURE_RECOVER);
// Make sure the procedure has not finished yet
- TShowDatabaseResp resp = leaderClient.showDatabase(req);
+ TShowDatabaseResp resp = leaderClient.showDatabase(showAllDatabasesReq);
Assert.assertTrue(resp.getDatabaseInfoMap().size() < MAX_STATE);
// Then shutdown the leader, wait the new leader exist and the procedure
continue
final int oldLeaderIndex = EnvFactory.getEnv().getLeaderConfigNodeIndex();
@@ -122,7 +134,7 @@ public class IoTDBProcedureIT {
(SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection();
Callable<Boolean> finalCheck =
() -> {
- TShowDatabaseResp resp1 = newLeaderClient.showDatabase(req);
+ TShowDatabaseResp resp1 =
newLeaderClient.showDatabase(showAllDatabasesReq);
if (MAX_STATE != resp1.getDatabaseInfoMap().size()) {
return false;
}
@@ -132,6 +144,77 @@ public class IoTDBProcedureIT {
.forEach(databaseName -> expectedDatabases.remove(databaseName));
return expectedDatabases.isEmpty();
};
- Awaitility.await().atMost(1, TimeUnit.MINUTES).until(finalCheck);
+ Awaitility.await().pollDelay(1, SECONDS).atMost(1,
TimeUnit.MINUTES).until(finalCheck);
+ }
+
+ /**
+ * Testing the sub-procedure functionality of the Procedure framework: the
parent procedure will
+ * not execute before the sub-procedure finishing.
+ */
+ @Test
+ public void subProcedureTest() throws Exception {
+ EnvFactory.getEnv().initClusterEnvironment(1, 1);
+
+ SyncConfigNodeIServiceClient leaderClient =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection();
+ leaderClient.callSpecialProcedure(TTestOperation.TEST_SUB_PROCEDURE);
+
+ boolean checkBeforeConfigNodeRestart = false;
+ try {
+ Awaitility.await()
+ .pollDelay(1, SECONDS)
+ .atMost(10, SECONDS)
+ .until(
+ () -> {
+ TShowDatabaseResp resp =
leaderClient.showDatabase(showAllDatabasesReq);
+ return resp.getDatabaseInfoMap()
+
.containsKey(AddNeverFinishSubProcedureProcedure.FAIL_DATABASE_NAME);
+ });
+ } catch (ConditionTimeoutException e) {
+ checkBeforeConfigNodeRestart = true;
+ }
+ if (!checkBeforeConfigNodeRestart) {
+ throw new Exception("checkBeforeConfigNodeRestart fail");
+ }
+
+ // Restart the ConfigNode
+ final int leaderConfigNodeIndex =
EnvFactory.getEnv().getLeaderConfigNodeIndex();
+
EnvFactory.getEnv().getConfigNodeWrapperList().get(leaderConfigNodeIndex).stop();
+
EnvFactory.getEnv().getConfigNodeWrapperList().get(leaderConfigNodeIndex).start();
+ SyncConfigNodeIServiceClient newLeaderClient =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection();
+ // Make sure leader ConfigNode is working
+ Awaitility.await()
+ .pollDelay(1, SECONDS)
+ .atMost(10, SECONDS)
+ .until(
+ () -> {
+ try {
+ newLeaderClient.showDatabase(showAllDatabasesReq);
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ });
+
+ boolean checkAfterConfigNodeRestart = false;
+ try {
+ Awaitility.await()
+ .pollDelay(1, SECONDS)
+ .atMost(10, SECONDS)
+ .until(
+ () -> {
+ TShowDatabaseResp resp =
newLeaderClient.showDatabase(showAllDatabasesReq);
+ return resp.getDatabaseInfoMap()
+
.containsKey(AddNeverFinishSubProcedureProcedure.FAIL_DATABASE_NAME);
+ });
+ } catch (ConditionTimeoutException e) {
+ checkAfterConfigNodeRestart = true;
+ }
+ if (!checkAfterConfigNodeRestart) {
+ throw new Exception("checkAfterConfigNodeRestart fail");
+ }
+
+ LOGGER.info("test pass");
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index ee237bd846e..9ae557b7606 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -46,7 +46,6 @@ import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.AuthUtils;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
@@ -618,12 +617,6 @@ public class ConfigManager implements IManager {
}
}
- @TestOnly
- @Override
- public TSStatus createManyDatabases() {
- return getProcedureManager().createManyDatabases();
- }
-
private List<TSeriesPartitionSlot> calculateRelatedSlot(PartialPath path,
PartialPath database) {
// The path contains `**`
if (path.getFullPath().contains(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 99a10a889df..109393ea2ca 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
import
org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
import
org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
@@ -327,14 +326,6 @@ public interface IManager {
*/
TSStatus deleteDatabases(TDeleteDatabasesReq tDeleteReq);
- /**
- * Create many databases.
- *
- * @return status
- */
- @TestOnly
- TSStatus createManyDatabases();
-
/**
* Get SchemaPartition.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index d84870710ce..dfd32c867ab 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -51,7 +51,6 @@ import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
import org.apache.iotdb.confignode.procedure.ProcedureMetrics;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure;
import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
import
org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
@@ -76,6 +75,8 @@ import
org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedu
import org.apache.iotdb.confignode.procedure.impl.schema.SetTemplateProcedure;
import
org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.sync.AuthOperationProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.testonly.AddNeverFinishSubProcedureProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.testonly.CreateManyDatabasesProcedure;
import
org.apache.iotdb.confignode.procedure.impl.trigger.CreateTriggerProcedure;
import org.apache.iotdb.confignode.procedure.impl.trigger.DropTriggerProcedure;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
@@ -179,6 +180,12 @@ public class ProcedureManager {
return StatusUtils.OK;
}
+ @TestOnly
+ public TSStatus testSubProcedure() {
+ this.executor.submitProcedure(new AddNeverFinishSubProcedureProcedure());
+ return StatusUtils.OK;
+ }
+
public TSStatus deleteDatabases(
ArrayList<TDatabaseSchema> deleteSgSchemaList, boolean
isGeneratedByPipe) {
List<Long> procedureIds = new ArrayList<>();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 4afb5b0eb32..30284b478f5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -195,7 +195,14 @@ public class ProcedureExecutor<Env> {
waitingList.forEach(
procedure -> {
- if (procedure.hasChildren()) {
+ if (!procedure.hasChildren()) {
+ // Normally, WAITING procedures should be wakened by its children.
+ // But, there is a case that, all the children are successful, and
before
+ // they can wake up their parent procedure, the master was killed.
+ // So, during recovering the procedures from ProcedureWal, its
children
+ // are not loaded because of their SUCCESS state.
+ // So we need to continue to run this WAITING procedure. Before
+ // executing, we need to set its state to RUNNABLE.
procedure.setState(ProcedureState.RUNNABLE);
runnableList.add(procedure);
} else {
@@ -485,7 +492,7 @@ public class ProcedureExecutor<Env> {
}
/**
- * Serve as a countdown latch to check whether all children has completed.
+ * Serve as a countdown latch to check whether all children have already
completed.
*
* @param rootProcStack root procedure stack
* @param proc proc
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/AddNeverFinishSubProcedureProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/AddNeverFinishSubProcedureProcedure.java
new file mode 100644
index 00000000000..9fff83235ef
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/AddNeverFinishSubProcedureProcedure.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.testonly;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+@TestOnly
+public class AddNeverFinishSubProcedureProcedure
+ extends StateMachineProcedure<ConfigNodeProcedureEnv, Integer> {
+ public static final String FAIL_DATABASE_NAME = "root.fail";
+
+ @Override
+ protected Flow executeFromState(ConfigNodeProcedureEnv env, Integer state)
+ throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
+ if (state == 0) {
+ // the sub procedure will never finish, so the father procedure should
never be called again
+ addChildProcedure(new NeverFinishProcedure());
+ setNextState(1);
+ return Flow.HAS_MORE_STATE;
+ }
+ if (state == 1) {
+ // test fail
+ ProcedureTestUtils.createDatabase(env.getConfigManager(),
FAIL_DATABASE_NAME);
+ }
+ return Flow.NO_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv configNodeProcedureEnv,
Integer integer)
+ throws IOException, InterruptedException, ProcedureException {}
+
+ @Override
+ protected Integer getState(int stateId) {
+ return stateId;
+ }
+
+ @Override
+ protected int getStateId(Integer integer) {
+ return integer;
+ }
+
+ @Override
+ protected Integer getInitialState() {
+ return 0;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+
stream.writeShort(ProcedureType.ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE.getTypeCode());
+ super.serialize(stream);
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateManyDatabasesProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/CreateManyDatabasesProcedure.java
similarity index 88%
rename from
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateManyDatabasesProcedure.java
rename to
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/CreateManyDatabasesProcedure.java
index cbbba8c9a7d..7d1c0119c23 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateManyDatabasesProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/CreateManyDatabasesProcedure.java
@@ -17,16 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl;
+package org.apache.iotdb.confignode.procedure.impl.testonly;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
-import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -71,11 +69,7 @@ public class CreateManyDatabasesProcedure
private void createDatabase(ConfigNodeProcedureEnv env, int id) throws
ProcedureException {
String databaseName = DATABASE_NAME_PREFIX + id;
- TDatabaseSchema databaseSchema = new TDatabaseSchema(databaseName);
- TSStatus status =
- env.getConfigManager()
- .setDatabase(
- new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase,
databaseSchema));
+ TSStatus status =
ProcedureTestUtils.createDatabase(env.getConfigManager(), databaseName);
if (TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() ==
status.getCode()) {
// First mistakes are forgivable, but a second signals a problem.
if (!createFailedOnce) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/NeverFinishProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/NeverFinishProcedure.java
new file mode 100644
index 00000000000..2fd7b56d252
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/NeverFinishProcedure.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.testonly;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** This procedure will never finish. */
+@TestOnly
+public class NeverFinishProcedure extends
StateMachineProcedure<ConfigNodeProcedureEnv, Integer> {
+ @Override
+ protected Flow executeFromState(ConfigNodeProcedureEnv env, Integer state)
+ throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
+ setNextState(state + 1);
+ Thread.sleep(1000);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv configNodeProcedureEnv,
Integer state)
+ throws IOException, InterruptedException, ProcedureException {}
+
+ @Override
+ protected Integer getState(int stateId) {
+ return stateId;
+ }
+
+ @Override
+ protected int getStateId(Integer integer) {
+ return integer;
+ }
+
+ @Override
+ protected Integer getInitialState() {
+ return 0;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeShort(ProcedureType.NEVER_FINISH_PROCEDURE.getTypeCode());
+ super.serialize(stream);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TestOnly.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/ProcedureTestUtils.java
similarity index 53%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TestOnly.java
copy to
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/ProcedureTestUtils.java
index 089d906e91c..0c857909bf0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TestOnly.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/ProcedureTestUtils.java
@@ -15,20 +15,20 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.iotdb.commons.utils;
+package org.apache.iotdb.confignode.procedure.impl.testonly;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
-/**
- * TestOnly implies that the class or method should only be used in the tests,
otherwise its
- * functionality is not guaranteed and may interfere with the normal code.
- */
-@Target({ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.TYPE})
-@Retention(RetentionPolicy.SOURCE)
-public @interface TestOnly {}
+public class ProcedureTestUtils {
+ public static TSStatus createDatabase(ConfigManager configManager, String
name) {
+ TDatabaseSchema databaseSchema = new TDatabaseSchema(name);
+ return configManager.setDatabase(
+ new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase,
databaseSchema));
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 5f1362a8dc6..3f94d65e06b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.confignode.procedure.store;
import org.apache.iotdb.confignode.procedure.Procedure;
-import org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure;
import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
import
org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
@@ -49,6 +48,9 @@ import
org.apache.iotdb.confignode.procedure.impl.sync.CreatePipeProcedure;
import org.apache.iotdb.confignode.procedure.impl.sync.DropPipeProcedure;
import org.apache.iotdb.confignode.procedure.impl.sync.StartPipeProcedure;
import org.apache.iotdb.confignode.procedure.impl.sync.StopPipeProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.testonly.AddNeverFinishSubProcedureProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.testonly.CreateManyDatabasesProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.testonly.NeverFinishProcedure;
import
org.apache.iotdb.confignode.procedure.impl.trigger.CreateTriggerProcedure;
import org.apache.iotdb.confignode.procedure.impl.trigger.DropTriggerProcedure;
import org.apache.iotdb.confignode.service.ConfigNode;
@@ -199,6 +201,12 @@ public class ProcedureFactory implements IProcedureFactory
{
case CREATE_MANY_DATABASES_PROCEDURE:
procedure = new CreateManyDatabasesProcedure();
break;
+ case NEVER_FINISH_PROCEDURE:
+ procedure = new NeverFinishProcedure();
+ break;
+ case ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE:
+ procedure = new AddNeverFinishSubProcedureProcedure();
+ break;
default:
LOGGER.error("Unknown Procedure type: {}", typeCode);
throw new IOException("Unknown Procedure type: " + typeCode);
@@ -270,6 +278,10 @@ public class ProcedureFactory implements IProcedureFactory
{
return ProcedureType.AUTH_OPERATE_PROCEDURE;
} else if (procedure instanceof CreateManyDatabasesProcedure) {
return ProcedureType.CREATE_MANY_DATABASES_PROCEDURE;
+ } else if (procedure instanceof NeverFinishProcedure) {
+ return ProcedureType.NEVER_FINISH_PROCEDURE;
+ } else if (procedure instanceof AddNeverFinishSubProcedureProcedure) {
+ return ProcedureType.ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE;
}
throw new UnsupportedOperationException(
"Procedure type " + procedure.getClass() + " is not supported");
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index 021c713b0c7..5b31f7f3b59 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.confignode.procedure.store;
+import org.apache.iotdb.commons.utils.TestOnly;
+
import java.util.HashMap;
import java.util.Map;
@@ -34,6 +36,7 @@ public enum ProcedureType {
DELETE_DATABASE_PROCEDURE((short) 200),
REGION_MIGRATE_PROCEDURE((short) 201),
CREATE_REGION_GROUPS((short) 202),
+ @TestOnly
CREATE_MANY_DATABASES_PROCEDURE((short) 203),
/** Timeseries */
@@ -93,7 +96,12 @@ public enum ProcedureType {
PIPE_ENRICHED_CREATE_TRIGGER_PROCEDURE((short) 1408),
PIPE_ENRICHED_DROP_TRIGGER_PROCEDURE((short) 1409),
PIPE_ENRICHED_AUTH_OPERATE_PROCEDURE((short) 1410),
- ;
+
+ /** Other */
+ @TestOnly
+ NEVER_FINISH_PROCEDURE((short) 66600),
+ @TestOnly
+ ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE((short) 66601);
private final short typeCode;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index ba7c07376bf..d2f3358a68b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -155,6 +155,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowThrottleReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTestOperation;
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.confignode.service.ConfigNode;
@@ -483,8 +484,17 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TSStatus createManyDatabases() throws TException {
- return configManager.createManyDatabases();
+ public TSStatus callSpecialProcedure(TTestOperation operation) throws
TException {
+ switch (operation) {
+ case TEST_PROCEDURE_RECOVER:
+ return configManager.getProcedureManager().createManyDatabases();
+ case TEST_SUB_PROCEDURE:
+ return configManager.getProcedureManager().testSubProcedure();
+ default:
+ String msg = String.format("operation %s is not supported", operation);
+ LOGGER.error(msg);
+ throw new UnsupportedOperationException(msg);
+ }
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 44c5f94c3fa..ddaf6dd75ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -124,6 +124,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowThrottleReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTestOperation;
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -464,9 +465,9 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
}
@Override
- public TSStatus createManyDatabases() throws TException {
+ public TSStatus callSpecialProcedure(TTestOperation operation) throws
TException {
return executeRemoteCallWithRetry(
- () -> client.createManyDatabases(), status ->
!updateConfigNodeLeader(status));
+ () -> client.callSpecialProcedure(operation), status ->
!updateConfigNodeLeader(status));
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TestOnly.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TestOnly.java
index 089d906e91c..d31cbc83195 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TestOnly.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TestOnly.java
@@ -29,6 +29,6 @@ import java.lang.annotation.Target;
* TestOnly implies that the class or method should only be used in the tests,
otherwise its
* functionality is not guaranteed and may interfere with the normal code.
*/
-@Target({ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.TYPE})
+@Target({ElementType.FIELD, ElementType.METHOD, ElementType.CONSTRUCTOR,
ElementType.TYPE})
@Retention(RetentionPolicy.SOURCE)
public @interface TestOnly {}
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 547258623eb..97530c429c6 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -836,6 +836,13 @@ enum TActivationControl {
ALL_LICENSE_FILE_DELETED
}
+// ====================================================
+// Test only
+// ====================================================
+enum TTestOperation {
+ TEST_PROCEDURE_RECOVER,
+ TEST_SUB_PROCEDURE,
+}
service IConfigNodeRPCService {
@@ -969,7 +976,7 @@ service IConfigNodeRPCService {
TDatabaseSchemaResp getMatchedDatabaseSchemas(TGetDatabaseReq req)
/** Test only */
- common.TSStatus createManyDatabases()
+ common.TSStatus callSpecialProcedure(TTestOperation operation)
// ======================================================
// SchemaPartition