This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ff2b2d774 [IOTDB-4559]DropTrigger process on ConfigNode
2ff2b2d774 is described below

commit 2ff2b2d774be5131e89363898bf355325fd7d2a0
Author: Weihao Li <[email protected]>
AuthorDate: Fri Sep 30 11:27:24 2022 +0800

    [IOTDB-4559]DropTrigger process on ConfigNode
---
 .../iotdb/confignode/manager/ProcedureManager.java |  19 +++
 .../iotdb/confignode/manager/TriggerManager.java   |   3 +-
 .../iotdb/confignode/persistence/TriggerInfo.java  |  14 ++
 .../procedure/env/ConfigNodeProcedureEnv.java      |   9 +-
 .../procedure/impl/CreateTriggerProcedure.java     |   5 +-
 .../procedure/impl/DropTriggerProcedure.java       | 175 +++++++++++++++++++++
 .../procedure/state/DropTriggerState.java          |  27 ++++
 .../procedure/store/ProcedureFactory.java          |   9 +-
 .../procedure/impl/DropTriggerProcedureTest.java   |  55 +++++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   3 +-
 10 files changed, 308 insertions(+), 11 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 6b9f3e819a..656e5880f6 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -39,6 +39,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
 import org.apache.iotdb.confignode.procedure.impl.CreateTriggerProcedure;
 import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
 import org.apache.iotdb.confignode.procedure.impl.DeleteTimeSeriesProcedure;
+import org.apache.iotdb.confignode.procedure.impl.DropTriggerProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RemoveDataNodeProcedure;
@@ -244,6 +245,24 @@ public class ProcedureManager {
     }
   }
 
+  /**
+   * Generate DropTriggerProcedure and wait for it finished
+   *
+   * @return SUCCESS_STATUS if trigger dropped successfully, 
DROP_TRIGGER_ERROR otherwise
+   */
+  public TSStatus dropTrigger(String triggerName) {
+    long procedureId = executor.submitProcedure(new 
DropTriggerProcedure(triggerName));
+    List<TSStatus> statusList = new ArrayList<>();
+    boolean isSucceed =
+        waitingProcedureFinished(Collections.singletonList(procedureId), 
statusList);
+    if (isSucceed) {
+      return RpcUtils.SUCCESS_STATUS;
+    } else {
+      return new TSStatus(TSStatusCode.DROP_TRIGGER_ERROR.getStatusCode())
+          .setMessage(statusList.get(0).getMessage());
+    }
+  }
+
   /**
    * Waiting until the specific procedures finished
    *
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
index cc329952d6..4100251d30 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
@@ -93,8 +93,7 @@ public class TriggerManager {
   }
 
   public TSStatus dropTrigger(TDropTriggerReq req) {
-    // TODO
-    return null;
+    return 
configManager.getProcedureManager().dropTrigger(req.getTriggerName());
   }
 
   public TGetTriggerTableResp getTriggerTable() {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
index e9f4fac2fd..77c7464be9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
@@ -113,6 +113,20 @@ public class TriggerInfo implements SnapshotProcessor {
     }
   }
 
+  /**
+   * Validate whether the trigger can be dropped
+   *
+   * @param triggerName
+   */
+  public void validate(String triggerName) {
+    if (triggerTable.containsTrigger(triggerName)) {
+      return;
+    }
+    throw new TriggerManagementException(
+        String.format(
+            "Failed to drop trigger [%s], this trigger has not been created", 
triggerName));
+  }
+
   public boolean needToSaveJar(String jarName) {
     return !existedJarToMD5.containsKey(jarName);
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index f3778361a2..c8133dda1a 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -392,15 +392,14 @@ public class ConfigNodeProcedureEnv {
     return dataNodeResponseStatus;
   }
 
-  public List<TSStatus> dropTriggerOnDataNodes(TriggerInformation 
triggerInformation)
-      throws IOException {
+  public List<TSStatus> dropTriggerOnDataNodes(String triggerName, boolean 
needToDeleteJarFile) {
     NodeManager nodeManager = configManager.getNodeManager();
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         nodeManager.getRegisteredDataNodeLocations();
     final List<TSStatus> dataNodeResponseStatus =
         Collections.synchronizedList(new 
ArrayList<>(dataNodeLocationMap.size()));
     final TDropTriggerInstanceReq request =
-        new TDropTriggerInstanceReq(triggerInformation.getTriggerName(), 
false);
+        new TDropTriggerInstanceReq(triggerName, needToDeleteJarFile);
     AsyncDataNodeClientPool.getInstance()
         .sendAsyncRequestToDataNodeWithRetry(
             request,
@@ -410,7 +409,7 @@ public class ConfigNodeProcedureEnv {
     return dataNodeResponseStatus;
   }
 
-  public List<TSStatus> activeTriggerOnDataNodes(String triggerName) throws 
IOException {
+  public List<TSStatus> activeTriggerOnDataNodes(String triggerName) {
     NodeManager nodeManager = configManager.getNodeManager();
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         nodeManager.getRegisteredDataNodeLocations();
@@ -427,7 +426,7 @@ public class ConfigNodeProcedureEnv {
     return dataNodeResponseStatus;
   }
 
-  public List<TSStatus> inactiveTriggerOnDataNodes(String triggerName) throws 
IOException {
+  public List<TSStatus> inactiveTriggerOnDataNodes(String triggerName) {
     NodeManager nodeManager = configManager.getNodeManager();
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         nodeManager.getRegisteredDataNodeLocations();
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
index 85bdfaa021..a016160087 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
@@ -44,7 +44,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-/** remove config node procedure */
+/** create trigger procedure */
 public class CreateTriggerProcedure extends 
AbstractNodeProcedure<CreateTriggerState> {
   private static final Logger LOG = 
LoggerFactory.getLogger(CreateTriggerProcedure.class);
   private static final int retryThreshold = 5;
@@ -202,7 +202,8 @@ public class CreateTriggerProcedure extends 
AbstractNodeProcedure<CreateTriggerS
             "Start to [CONFIG_NODE_INACTIVE] rollback of trigger [{}]",
             triggerInformation.getTriggerName());
 
-        if 
(RpcUtils.squashResponseStatusList(env.dropTriggerOnDataNodes(triggerInformation))
+        if (RpcUtils.squashResponseStatusList(
+                    
env.dropTriggerOnDataNodes(triggerInformation.getTriggerName(), false))
                 .getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         } else {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedure.java
new file mode 100644
index 0000000000..7902466653
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedure.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.persistence.TriggerInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.DropTriggerState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** drop trigger procedure */
+public class DropTriggerProcedure extends 
AbstractNodeProcedure<DropTriggerState> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DropTriggerProcedure.class);
+  private static final int retryThreshold = 5;
+
+  private String triggerName;
+
+  public DropTriggerProcedure() {
+    super();
+  }
+
+  public DropTriggerProcedure(String triggerName) {
+    super();
+    this.triggerName = triggerName;
+  }
+
+  @Override
+  protected Flow executeFromState(ConfigNodeProcedureEnv env, DropTriggerState 
state) {
+    if (triggerName == null) {
+      return Flow.NO_MORE_STATE;
+    }
+    try {
+      switch (state) {
+        case INIT:
+          LOG.info("Start to drop trigger [{}]", triggerName);
+
+          TriggerInfo triggerInfo = 
env.getConfigManager().getTriggerManager().getTriggerInfo();
+          triggerInfo.acquireTriggerTableLock();
+
+          triggerInfo.validate(triggerName);
+
+          env.getConfigManager()
+              .getConsensusManager()
+              .write(new UpdateTriggerStateInTablePlan(triggerName, 
TTriggerState.DROPPING));
+          setNextState(DropTriggerState.CONFIG_NODE_DROPPING);
+          break;
+
+        case CONFIG_NODE_DROPPING:
+          LOG.info("Start to drop trigger [{}] on Data Nodes", triggerName);
+
+          // TODO consider using reference counts to determine whether to 
remove jar
+          if 
(RpcUtils.squashResponseStatusList(env.dropTriggerOnDataNodes(triggerName, 
false))
+                  .getCode()
+              == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            setNextState(DropTriggerState.DATA_NODE_DROPPED);
+          } else {
+            throw new TriggerManagementException(
+                String.format("Fail to drop trigger [%s] on Data Nodes", 
triggerName));
+          }
+          break;
+
+        case DATA_NODE_DROPPED:
+          LOG.info("Start to drop trigger [{}] on Config Nodes", triggerName);
+          env.getConfigManager()
+              .getConsensusManager()
+              .write(new DeleteTriggerInTablePlan(triggerName));
+          setNextState(DropTriggerState.CONFIG_NODE_DROPPED);
+          break;
+
+        case CONFIG_NODE_DROPPED:
+          
env.getConfigManager().getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
+          return Flow.NO_MORE_STATE;
+      }
+    } catch (Exception e) {
+      if (isRollbackSupported(state)) {
+        LOG.error("Fail in DropTriggerProcedure", e);
+        setFailure(new ProcedureException(e.getMessage()));
+      } else {
+        LOG.error(
+            "Retrievable error trying to drop trigger [{}], state [{}]", 
triggerName, state, e);
+        if (getCycles() > retryThreshold) {
+          setFailure(
+              new ProcedureException(
+                  String.format("Fail to drop trigger [%s] at STATE [%s]", 
triggerName, state)));
+        }
+      }
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(ConfigNodeProcedureEnv env, DropTriggerState 
state)
+      throws IOException, InterruptedException, ProcedureException {
+    if (state == DropTriggerState.INIT) {
+      LOG.info("Start [INIT] rollback of trigger [{}]", triggerName);
+
+      
env.getConfigManager().getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
+    }
+  }
+
+  @Override
+  protected boolean isRollbackSupported(DropTriggerState state) {
+    return true;
+  }
+
+  @Override
+  protected DropTriggerState getState(int stateId) {
+    return DropTriggerState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(DropTriggerState dropTriggerState) {
+    return dropTriggerState.ordinal();
+  }
+
+  @Override
+  protected DropTriggerState getInitialState() {
+    return DropTriggerState.INIT;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    
stream.writeInt(ProcedureFactory.ProcedureType.DROP_TRIGGER_PROCEDURE.ordinal());
+    super.serialize(stream);
+    ReadWriteIOUtils.write(triggerName, stream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    triggerName = ReadWriteIOUtils.readString(byteBuffer);
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof DropTriggerProcedure) {
+      DropTriggerProcedure thatProc = (DropTriggerProcedure) that;
+      return thatProc.getProcId() == this.getProcId()
+          && thatProc.getState() == this.getState()
+          && (thatProc.triggerName).equals(this.triggerName);
+    }
+    return false;
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DropTriggerState.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DropTriggerState.java
new file mode 100644
index 0000000000..d55902497a
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DropTriggerState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.state;
+
+public enum DropTriggerState {
+  INIT,
+  CONFIG_NODE_DROPPING,
+  DATA_NODE_DROPPED,
+  CONFIG_NODE_DROPPED
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 7101f1e7f5..218d976a45 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
 import org.apache.iotdb.confignode.procedure.impl.CreateTriggerProcedure;
 import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
 import org.apache.iotdb.confignode.procedure.impl.DeleteTimeSeriesProcedure;
+import org.apache.iotdb.confignode.procedure.impl.DropTriggerProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RemoveDataNodeProcedure;
@@ -73,6 +74,9 @@ public class ProcedureFactory implements IProcedureFactory {
       case CREATE_TRIGGER_PROCEDURE:
         procedure = new CreateTriggerProcedure();
         break;
+      case DROP_TRIGGER_PROCEDURE:
+        procedure = new DropTriggerProcedure();
+        break;
       default:
         LOGGER.error("unknown Procedure type: " + typeNum);
         throw new IOException("unknown Procedure type: " + typeNum);
@@ -98,6 +102,8 @@ public class ProcedureFactory implements IProcedureFactory {
       return ProcedureType.DELETE_TIMESERIES_PROCEDURE;
     } else if (procedure instanceof CreateTriggerProcedure) {
       return ProcedureType.CREATE_TRIGGER_PROCEDURE;
+    } else if (procedure instanceof DropTriggerProcedure) {
+      return ProcedureType.DROP_TRIGGER_PROCEDURE;
     }
     return null;
   }
@@ -110,7 +116,8 @@ public class ProcedureFactory implements IProcedureFactory {
     REGION_MIGRATE_PROCEDURE,
     CREATE_REGION_GROUPS,
     DELETE_TIMESERIES_PROCEDURE,
-    CREATE_TRIGGER_PROCEDURE
+    CREATE_TRIGGER_PROCEDURE,
+    DROP_TRIGGER_PROCEDURE
   }
 
   private static class ProcedureFactoryHolder {
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedureTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedureTest.java
new file mode 100644
index 0000000000..324047c671
--- /dev/null
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/DropTriggerProcedureTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class DropTriggerProcedureTest {
+
+  @Test
+  public void serializeDeserializeTest() {
+
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+
+    DropTriggerProcedure p1 = new DropTriggerProcedure("test");
+
+    try {
+      p1.serialize(outputStream);
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+
+      DropTriggerProcedure p2 =
+          (DropTriggerProcedure) ProcedureFactory.getInstance().create(buffer);
+      assertEquals(p1, p2);
+    } catch (Exception e) {
+      fail();
+    }
+  }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 85315b196f..a51bf28f29 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -166,7 +166,8 @@ public enum TSStatusCode {
   REMOVE_DATANODE_FAILED(919),
   OVERLAP_WITH_EXISTING_DELETE_TIMESERIES_TASK(920),
   NOT_AVAILABLE_REGION_GROUP(921),
-  CREATE_TRIGGER_ERROR(922);
+  CREATE_TRIGGER_ERROR(922),
+  DROP_TRIGGER_ERROR(923);
 
   private int statusCode;
 

Reply via email to