This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2ff2b2d774 [IOTDB-4559]DropTrigger process on ConfigNode
2ff2b2d774 is described below
commit 2ff2b2d774be5131e89363898bf355325fd7d2a0
Author: Weihao Li <[email protected]>
AuthorDate: Fri Sep 30 11:27:24 2022 +0800
[IOTDB-4559]DropTrigger process on ConfigNode
---
.../iotdb/confignode/manager/ProcedureManager.java | 19 +++
.../iotdb/confignode/manager/TriggerManager.java | 3 +-
.../iotdb/confignode/persistence/TriggerInfo.java | 14 ++
.../procedure/env/ConfigNodeProcedureEnv.java | 9 +-
.../procedure/impl/CreateTriggerProcedure.java | 5 +-
.../procedure/impl/DropTriggerProcedure.java | 175 +++++++++++++++++++++
.../procedure/state/DropTriggerState.java | 27 ++++
.../procedure/store/ProcedureFactory.java | 9 +-
.../procedure/impl/DropTriggerProcedureTest.java | 55 +++++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 +-
10 files changed, 308 insertions(+), 11 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 6b9f3e819a..656e5880f6 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -39,6 +39,7 @@ import
org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
import org.apache.iotdb.confignode.procedure.impl.CreateTriggerProcedure;
import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.procedure.impl.DeleteTimeSeriesProcedure;
+import org.apache.iotdb.confignode.procedure.impl.DropTriggerProcedure;
import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.RemoveDataNodeProcedure;
@@ -244,6 +245,24 @@ public class ProcedureManager {
}
}
+ /**
+ * Generate DropTriggerProcedure and wait for it finished
+ *
+ * @return SUCCESS_STATUS if trigger dropped successfully,
DROP_TRIGGER_ERROR otherwise
+ */
+ public TSStatus dropTrigger(String triggerName) {
+ long procedureId = executor.submitProcedure(new
DropTriggerProcedure(triggerName));
+ List<TSStatus> statusList = new ArrayList<>();
+ boolean isSucceed =
+ waitingProcedureFinished(Collections.singletonList(procedureId),
statusList);
+ if (isSucceed) {
+ return RpcUtils.SUCCESS_STATUS;
+ } else {
+ return new TSStatus(TSStatusCode.DROP_TRIGGER_ERROR.getStatusCode())
+ .setMessage(statusList.get(0).getMessage());
+ }
+ }
+
/**
* Waiting until the specific procedures finished
*
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
index cc329952d6..4100251d30 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
@@ -93,8 +93,7 @@ public class TriggerManager {
}
public TSStatus dropTrigger(TDropTriggerReq req) {
- // TODO
- return null;
+ return
configManager.getProcedureManager().dropTrigger(req.getTriggerName());
}
public TGetTriggerTableResp getTriggerTable() {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
index e9f4fac2fd..77c7464be9 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
@@ -113,6 +113,20 @@ public class TriggerInfo implements SnapshotProcessor {
}
}
+ /**
+ * Validate whether the trigger can be dropped
+ *
+ * @param triggerName
+ */
+ public void validate(String triggerName) {
+ if (triggerTable.containsTrigger(triggerName)) {
+ return;
+ }
+ throw new TriggerManagementException(
+ String.format(
+ "Failed to drop trigger [%s], this trigger has not been created",
triggerName));
+ }
+
public boolean needToSaveJar(String jarName) {
return !existedJarToMD5.containsKey(jarName);
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index f3778361a2..c8133dda1a 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -392,15 +392,14 @@ public class ConfigNodeProcedureEnv {
return dataNodeResponseStatus;
}
- public List<TSStatus> dropTriggerOnDataNodes(TriggerInformation
triggerInformation)
- throws IOException {
+ public List<TSStatus> dropTriggerOnDataNodes(String triggerName, boolean
needToDeleteJarFile) {
NodeManager nodeManager = configManager.getNodeManager();
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
nodeManager.getRegisteredDataNodeLocations();
final List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new
ArrayList<>(dataNodeLocationMap.size()));
final TDropTriggerInstanceReq request =
- new TDropTriggerInstanceReq(triggerInformation.getTriggerName(),
false);
+ new TDropTriggerInstanceReq(triggerName, needToDeleteJarFile);
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
request,
@@ -410,7 +409,7 @@ public class ConfigNodeProcedureEnv {
return dataNodeResponseStatus;
}
- public List<TSStatus> activeTriggerOnDataNodes(String triggerName) throws
IOException {
+ public List<TSStatus> activeTriggerOnDataNodes(String triggerName) {
NodeManager nodeManager = configManager.getNodeManager();
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
nodeManager.getRegisteredDataNodeLocations();
@@ -427,7 +426,7 @@ public class ConfigNodeProcedureEnv {
return dataNodeResponseStatus;
}
- public List<TSStatus> inactiveTriggerOnDataNodes(String triggerName) throws
IOException {
+ public List<TSStatus> inactiveTriggerOnDataNodes(String triggerName) {
NodeManager nodeManager = configManager.getNodeManager();
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
nodeManager.getRegisteredDataNodeLocations();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
index 85bdfaa021..a016160087 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
@@ -44,7 +44,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-/** remove config node procedure */
+/** create trigger procedure */
public class CreateTriggerProcedure extends
AbstractNodeProcedure<CreateTriggerState> {
private static final Logger LOG =
LoggerFactory.getLogger(CreateTriggerProcedure.class);
private static final int retryThreshold = 5;
@@ -202,7 +202,8 @@ public class CreateTriggerProcedure extends
AbstractNodeProcedure<CreateTriggerS
"Start to [CONFIG_NODE_INACTIVE] rollback of trigger [{}]",
triggerInformation.getTriggerName());
- if
(RpcUtils.squashResponseStatusList(env.dropTriggerOnDataNodes(triggerInformation))
+ if (RpcUtils.squashResponseStatusList(
+
env.dropTriggerOnDataNodes(triggerInformation.getTriggerName(), false))
.getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
} else {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedure.java
new file mode 100644
index 0000000000..7902466653
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedure.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.persistence.TriggerInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.DropTriggerState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** drop trigger procedure */
+public class DropTriggerProcedure extends
AbstractNodeProcedure<DropTriggerState> {
+ private static final Logger LOG =
LoggerFactory.getLogger(DropTriggerProcedure.class);
+ private static final int retryThreshold = 5;
+
+ private String triggerName;
+
+ public DropTriggerProcedure() {
+ super();
+ }
+
+ public DropTriggerProcedure(String triggerName) {
+ super();
+ this.triggerName = triggerName;
+ }
+
+ @Override
+ protected Flow executeFromState(ConfigNodeProcedureEnv env, DropTriggerState
state) {
+ if (triggerName == null) {
+ return Flow.NO_MORE_STATE;
+ }
+ try {
+ switch (state) {
+ case INIT:
+ LOG.info("Start to drop trigger [{}]", triggerName);
+
+ TriggerInfo triggerInfo =
env.getConfigManager().getTriggerManager().getTriggerInfo();
+ triggerInfo.acquireTriggerTableLock();
+
+ triggerInfo.validate(triggerName);
+
+ env.getConfigManager()
+ .getConsensusManager()
+ .write(new UpdateTriggerStateInTablePlan(triggerName,
TTriggerState.DROPPING));
+ setNextState(DropTriggerState.CONFIG_NODE_DROPPING);
+ break;
+
+ case CONFIG_NODE_DROPPING:
+ LOG.info("Start to drop trigger [{}] on Data Nodes", triggerName);
+
+ // TODO consider using reference counts to determine whether to
remove jar
+ if
(RpcUtils.squashResponseStatusList(env.dropTriggerOnDataNodes(triggerName,
false))
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ setNextState(DropTriggerState.DATA_NODE_DROPPED);
+ } else {
+ throw new TriggerManagementException(
+ String.format("Fail to drop trigger [%s] on Data Nodes",
triggerName));
+ }
+ break;
+
+ case DATA_NODE_DROPPED:
+ LOG.info("Start to drop trigger [{}] on Config Nodes", triggerName);
+ env.getConfigManager()
+ .getConsensusManager()
+ .write(new DeleteTriggerInTablePlan(triggerName));
+ setNextState(DropTriggerState.CONFIG_NODE_DROPPED);
+ break;
+
+ case CONFIG_NODE_DROPPED:
+
env.getConfigManager().getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
+ return Flow.NO_MORE_STATE;
+ }
+ } catch (Exception e) {
+ if (isRollbackSupported(state)) {
+ LOG.error("Fail in DropTriggerProcedure", e);
+ setFailure(new ProcedureException(e.getMessage()));
+ } else {
+ LOG.error(
+ "Retrievable error trying to drop trigger [{}], state [{}]",
triggerName, state, e);
+ if (getCycles() > retryThreshold) {
+ setFailure(
+ new ProcedureException(
+ String.format("Fail to drop trigger [%s] at STATE [%s]",
triggerName, state)));
+ }
+ }
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv env, DropTriggerState
state)
+ throws IOException, InterruptedException, ProcedureException {
+ if (state == DropTriggerState.INIT) {
+ LOG.info("Start [INIT] rollback of trigger [{}]", triggerName);
+
+
env.getConfigManager().getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
+ }
+ }
+
+ @Override
+ protected boolean isRollbackSupported(DropTriggerState state) {
+ return true;
+ }
+
+ @Override
+ protected DropTriggerState getState(int stateId) {
+ return DropTriggerState.values()[stateId];
+ }
+
+ @Override
+ protected int getStateId(DropTriggerState dropTriggerState) {
+ return dropTriggerState.ordinal();
+ }
+
+ @Override
+ protected DropTriggerState getInitialState() {
+ return DropTriggerState.INIT;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+
stream.writeInt(ProcedureFactory.ProcedureType.DROP_TRIGGER_PROCEDURE.ordinal());
+ super.serialize(stream);
+ ReadWriteIOUtils.write(triggerName, stream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ triggerName = ReadWriteIOUtils.readString(byteBuffer);
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that instanceof DropTriggerProcedure) {
+ DropTriggerProcedure thatProc = (DropTriggerProcedure) that;
+ return thatProc.getProcId() == this.getProcId()
+ && thatProc.getState() == this.getState()
+ && (thatProc.triggerName).equals(this.triggerName);
+ }
+ return false;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DropTriggerState.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DropTriggerState.java
new file mode 100644
index 0000000000..d55902497a
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DropTriggerState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.state;
+
+public enum DropTriggerState {
+ INIT,
+ CONFIG_NODE_DROPPING,
+ DATA_NODE_DROPPED,
+ CONFIG_NODE_DROPPED
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 7101f1e7f5..218d976a45 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
import org.apache.iotdb.confignode.procedure.impl.CreateTriggerProcedure;
import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.procedure.impl.DeleteTimeSeriesProcedure;
+import org.apache.iotdb.confignode.procedure.impl.DropTriggerProcedure;
import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.RemoveDataNodeProcedure;
@@ -73,6 +74,9 @@ public class ProcedureFactory implements IProcedureFactory {
case CREATE_TRIGGER_PROCEDURE:
procedure = new CreateTriggerProcedure();
break;
+ case DROP_TRIGGER_PROCEDURE:
+ procedure = new DropTriggerProcedure();
+ break;
default:
LOGGER.error("unknown Procedure type: " + typeNum);
throw new IOException("unknown Procedure type: " + typeNum);
@@ -98,6 +102,8 @@ public class ProcedureFactory implements IProcedureFactory {
return ProcedureType.DELETE_TIMESERIES_PROCEDURE;
} else if (procedure instanceof CreateTriggerProcedure) {
return ProcedureType.CREATE_TRIGGER_PROCEDURE;
+ } else if (procedure instanceof DropTriggerProcedure) {
+ return ProcedureType.DROP_TRIGGER_PROCEDURE;
}
return null;
}
@@ -110,7 +116,8 @@ public class ProcedureFactory implements IProcedureFactory {
REGION_MIGRATE_PROCEDURE,
CREATE_REGION_GROUPS,
DELETE_TIMESERIES_PROCEDURE,
- CREATE_TRIGGER_PROCEDURE
+ CREATE_TRIGGER_PROCEDURE,
+ DROP_TRIGGER_PROCEDURE
}
private static class ProcedureFactoryHolder {
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedureTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedureTest.java
new file mode 100644
index 0000000000..324047c671
--- /dev/null
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedureTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class DropTriggerProcedureTest {
+
+ @Test
+ public void serializeDeserializeTest() {
+
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+
+ DropTriggerProcedure p1 = new DropTriggerProcedure("test");
+
+ try {
+ p1.serialize(outputStream);
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+
+ DropTriggerProcedure p2 =
+ (DropTriggerProcedure) ProcedureFactory.getInstance().create(buffer);
+ assertEquals(p1, p2);
+ } catch (Exception e) {
+ fail();
+ }
+ }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 85315b196f..a51bf28f29 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -166,7 +166,8 @@ public enum TSStatusCode {
REMOVE_DATANODE_FAILED(919),
OVERLAP_WITH_EXISTING_DELETE_TIMESERIES_TASK(920),
NOT_AVAILABLE_REGION_GROUP(921),
- CREATE_TRIGGER_ERROR(922);
+ CREATE_TRIGGER_ERROR(922),
+ DROP_TRIGGER_ERROR(923);
private int statusCode;