This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 46e49622ba [IOTDB-4500]CreateTrigger and ShowTrigger process on 
ConfigNode (#7474)
46e49622ba is described below

commit 46e49622bab4cbddbea2fde1f1ad97f6216e3361
Author: Weihao Li <[email protected]>
AuthorDate: Thu Sep 29 18:06:19 2022 +0800

    [IOTDB-4500]CreateTrigger and ShowTrigger process on ConfigNode (#7474)
---
 .../confignode/client/DataNodeRequestType.java     |   6 +
 .../async/datanode/AsyncDataNodeClientPool.java    |  33 +++
 .../async/handlers/TriggerManagementHandler.java   |  72 +++++
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  21 ++
 .../confignode/conf/ConfigNodeDescriptor.java      |   2 +
 .../consensus/request/ConfigPhysicalPlan.java      |  16 ++
 .../consensus/request/ConfigPhysicalPlanType.java  |   6 +-
 .../request/read/GetTriggerTablePlan.java          |  33 +--
 .../write/trigger/AddTriggerInTablePlan.java       |  84 ++++++
 .../write/trigger/DeleteTriggerInTablePlan.java    |  62 +++++
 .../trigger/UpdateTriggerStateInTablePlan.java     |  75 ++++++
 .../consensus/response/TriggerTableResp.java       |  70 +++++
 .../iotdb/confignode/manager/ConfigManager.java    |  38 +++
 .../apache/iotdb/confignode/manager/IManager.java  |  19 ++
 .../iotdb/confignode/manager/ProcedureManager.java |  22 ++
 .../iotdb/confignode/manager/TriggerManager.java   | 113 ++++++++
 .../iotdb/confignode/manager/node/NodeManager.java |  24 ++
 .../iotdb/confignode/persistence/TriggerInfo.java  | 161 +++++++++++
 .../persistence/executor/ConfigPlanExecutor.java   |  15 ++
 .../procedure/env/ConfigNodeProcedureEnv.java      |  82 ++++++
 .../procedure/impl/CreateTriggerProcedure.java     | 294 +++++++++++++++++++++
 .../procedure/state/CreateTriggerState.java        |  19 +-
 .../procedure/store/ProcedureFactory.java          |   9 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  10 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |  68 +++++
 .../procedure/impl/CreateTriggerProcedureTest.java | 111 ++++++++
 .../confignode1conf/iotdb-confignode.properties    |   1 +
 .../confignode2conf/iotdb-confignode.properties    |   1 +
 .../confignode3conf/iotdb-confignode.properties    |   1 +
 .../org/apache/iotdb/trigger/SimpleTrigger.java    |  22 +-
 .../iotdb/trigger/{ => old}/AlertingExample.java   |   2 +-
 .../iotdb/trigger/{ => old}/TriggerExample.java    |   2 +-
 .../commons/executable/ExecutableManager.java      |   4 +-
 .../org/apache/iotdb/commons/path/AlignedPath.java |   4 +-
 .../apache/iotdb/commons/path/MeasurementPath.java |   3 +-
 .../org/apache/iotdb/commons/path/PartialPath.java |  23 +-
 .../org/apache/iotdb/commons/path/PathType.java    |   4 +-
 .../iotdb/commons/trigger/TriggerInformation.java  |  26 +-
 .../apache/iotdb/commons/trigger/TriggerTable.java |  13 +-
 ...ption.java => TriggerJarTooLargeException.java} |   6 +-
 .../apache/iotdb/db/client/ConfigNodeClient.java   |   2 +-
 .../db/mpp/common/header/ColumnHeaderConstant.java |  13 +-
 .../config/executor/ClusterConfigTaskExecutor.java |   6 +-
 .../config/metadata/ShowTriggersTask.java          |  16 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  23 +-
 .../trigger/service/TriggerManagementService.java  |   8 +-
 .../datanode1conf/iotdb-datanode.properties        |   3 +-
 .../datanode2conf/iotdb-datanode.properties        |   4 +-
 .../datanode3conf/iotdb-datanode.properties        |   1 +
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   3 +-
 .../src/main/thrift/confignode.thrift              |   6 +-
 thrift/src/main/thrift/datanode.thrift             |  21 +-
 .../iotdb/trigger/api/enums/TriggerType.java       |   2 +-
 .../org/apache/iotdb/tsfile/read/common/Path.java  |   6 +-
 54 files changed, 1576 insertions(+), 115 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 053e31a9d5..e3ac3c249e 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -54,6 +54,12 @@ public enum DataNodeRequestType {
   CREATE_FUNCTION,
   DROP_FUNCTION,
 
+  /** Trigger */
+  CREATE_TRIGGER_INSTANCE,
+  DROP_TRIGGER_INSTANCE,
+  ACTIVE_TRIGGER_INSTANCE,
+  INACTIVE_TRIGGER_INSTANCE,
+
   /** TEMPLATE */
   UPDATE_TEMPLATE,
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
index b1ee6ee6f5..746fed9db4 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
@@ -45,18 +45,23 @@ import 
org.apache.iotdb.confignode.client.async.handlers.MergeHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.RollbackSchemaBlackListHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.SetSystemStatusHandler;
 import org.apache.iotdb.confignode.client.async.handlers.SetTTLHandler;
+import 
org.apache.iotdb.confignode.client.async.handlers.TriggerManagementHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.UpdateConfigNodeGroupHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.UpdateRegionRouteMapHandler;
 import org.apache.iotdb.confignode.client.async.task.AbstractDataNodeTask;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
+import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
+import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
+import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
@@ -123,6 +128,18 @@ public class AsyncDataNodeClientPool {
                     dataNodeLocationMap,
                     dataNodeResponseStatus);
             break;
+          case CREATE_TRIGGER_INSTANCE:
+          case DROP_TRIGGER_INSTANCE:
+          case ACTIVE_TRIGGER_INSTANCE:
+          case INACTIVE_TRIGGER_INSTANCE:
+            handler =
+                new TriggerManagementHandler(
+                    countDownLatch,
+                    requestType,
+                    targetDataNode,
+                    dataNodeLocationMap,
+                    dataNodeResponseStatus);
+            break;
           case FULL_MERGE:
           case MERGE:
             handler =
@@ -276,6 +293,22 @@ public class AsyncDataNodeClientPool {
         case DROP_FUNCTION:
           client.dropFunction((TDropFunctionRequest) req, 
(FunctionManagementHandler) handler);
           break;
+        case CREATE_TRIGGER_INSTANCE:
+          client.createTriggerInstance(
+              (TCreateTriggerInstanceReq) req, (TriggerManagementHandler) 
handler);
+          break;
+        case DROP_TRIGGER_INSTANCE:
+          client.dropTriggerInstance(
+              (TDropTriggerInstanceReq) req, (TriggerManagementHandler) 
handler);
+          break;
+        case ACTIVE_TRIGGER_INSTANCE:
+          client.activeTriggerInstance(
+              (TActiveTriggerInstanceReq) req, (TriggerManagementHandler) 
handler);
+          break;
+        case INACTIVE_TRIGGER_INSTANCE:
+          client.inactiveTriggerInstance(
+              (TInactiveTriggerInstanceReq) req, (TriggerManagementHandler) 
handler);
+          break;
         case MERGE:
         case FULL_MERGE:
           client.merge((MergeHandler) handler);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/TriggerManagementHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/TriggerManagementHandler.java
new file mode 100644
index 0000000000..00d8047bdf
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/TriggerManagementHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.async.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class TriggerManagementHandler extends AbstractRetryHandler
+    implements AsyncMethodCallback<TSStatus> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TriggerManagementHandler.class);
+
+  private final List<TSStatus> dataNodeResponseStatus;
+
+  public TriggerManagementHandler(
+      CountDownLatch countDownLatch,
+      DataNodeRequestType requestType,
+      TDataNodeLocation targetDataNode,
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+      List<TSStatus> dataNodeResponseStatus) {
+    super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
+    this.dataNodeResponseStatus = dataNodeResponseStatus;
+  }
+
+  @Override
+  public void onComplete(TSStatus response) {
+    dataNodeResponseStatus.add(response);
+    if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
+      LOGGER.info("Successfully {} on DataNode: {}", dataNodeRequestType, 
targetDataNode);
+    } else {
+      LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, 
targetDataNode);
+    }
+    countDownLatch.countDown();
+  }
+
+  @Override
+  public void onError(Exception exception) {
+    dataNodeResponseStatus.add(
+        new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+            .setMessage(targetDataNode + exception.getMessage()));
+    LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType, 
targetDataNode);
+    countDownLatch.countDown();
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 56988a9176..29fdac0f6e 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -100,6 +100,10 @@ public class ConfigNodeConfig {
   private String udfLibDir =
       IoTDBConstant.EXT_FOLDER_NAME + File.separator + 
IoTDBConstant.UDF_FOLDER_NAME;
 
+  /** External lib directory for Trigger, stores user-uploaded JAR files */
+  private String triggerLibDir =
+      IoTDBConstant.EXT_FOLDER_NAME + File.separator + 
IoTDBConstant.TRIGGER_FOLDER_NAME;
+
   /** External temporary lib directory for storing downloaded JAR files */
   private String temporaryLibDir =
       IoTDBConstant.EXT_FOLDER_NAME + File.separator + 
IoTDBConstant.UDF_TMP_FOLDER_NAME;
@@ -190,6 +194,7 @@ public class ConfigNodeConfig {
     extLibDir = addHomeDir(extLibDir);
     udfLibDir = addHomeDir(udfLibDir);
     temporaryLibDir = addHomeDir(temporaryLibDir);
+    triggerLibDir = addHomeDir(triggerLibDir);
   }
 
   private String addHomeDir(String dir) {
@@ -385,6 +390,22 @@ public class ConfigNodeConfig {
     this.udfLibDir = udfLibDir;
   }
 
+  public String getExtLibDir() {
+    return extLibDir;
+  }
+
+  public void setExtLibDir(String extLibDir) {
+    this.extLibDir = extLibDir;
+  }
+
+  public String getTriggerLibDir() {
+    return triggerLibDir;
+  }
+
+  public void setTriggerLibDir(String triggerLibDir) {
+    this.triggerLibDir = triggerLibDir;
+  }
+
   public String getTemporaryLibDir() {
     return temporaryLibDir;
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 2167564aa0..800bbeb0e5 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -202,6 +202,8 @@ public class ConfigNodeDescriptor {
       conf.setTemporaryLibDir(
           properties.getProperty("temporary_lib_dir", 
conf.getTemporaryLibDir()));
 
+      conf.setTriggerLibDir(properties.getProperty("trigger_lib_dir", 
conf.getTriggerLibDir()));
+
       conf.setTimePartitionInterval(
           Long.parseLong(
               properties.getProperty(
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 9073554974..d4681a860d 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
 import 
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.template.GetAllTemplateSetInfoPlan;
@@ -60,6 +61,9 @@ import 
org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan
 import 
org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.exception.runtime.SerializationRunTimeException;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -216,6 +220,18 @@ public abstract class ConfigPhysicalPlan implements 
IConsensusRequest {
         case DropFunction:
           req = new DropFunctionPlan();
           break;
+        case AddTriggerInTable:
+          req = new AddTriggerInTablePlan();
+          break;
+        case DeleteTriggerInTable:
+          req = new DeleteTriggerInTablePlan();
+          break;
+        case UpdateTriggerStateInTable:
+          req = new UpdateTriggerStateInTablePlan();
+          break;
+        case GetTriggerTable:
+          req = new GetTriggerTablePlan();
+          break;
         case CreateSchemaTemplate:
           req = new CreateSchemaTemplatePlan();
           break;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index b28ccf90b2..30a00fff43 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -81,5 +81,9 @@ public enum ConfigPhysicalPlanType {
   PollRegionMaintainTask,
   CreatePipeSink,
   DropPipeSink,
-  GetPipeSink
+  GetPipeSink,
+  AddTriggerInTable,
+  DeleteTriggerInTable,
+  GetTriggerTable,
+  UpdateTriggerStateInTable
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/path/PathType.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
similarity index 57%
copy from node-commons/src/main/java/org/apache/iotdb/commons/path/PathType.java
copy to 
confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
index 832667fa91..da1caa3ca7 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PathType.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
@@ -16,36 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.commons.path;
 
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+package org.apache.iotdb.confignode.consensus.request.read;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public enum PathType {
-  Measurement((byte) 0),
-  Aligned((byte) 1),
-  Partial((byte) 2),
-  Path((byte) 3);
-
-  private final byte pathType;
+public class GetTriggerTablePlan extends ConfigPhysicalPlan {
 
-  PathType(byte pathType) {
-    this.pathType = pathType;
+  public GetTriggerTablePlan() {
+    super(ConfigPhysicalPlanType.GetTriggerTable);
   }
 
-  public void serialize(ByteBuffer buffer) {
-    ReadWriteIOUtils.write(pathType, buffer);
+  @Override
+  protected void serializeImpl(DataOutputStream stream) throws IOException {
+    stream.writeInt(ConfigPhysicalPlanType.GetTriggerTable.ordinal());
   }
 
-  public void serialize(DataOutputStream stream) throws IOException {
-    ReadWriteIOUtils.write(pathType, stream);
-  }
-
-  public void serialize(PublicBAOS stream) throws IOException {
-    ReadWriteIOUtils.write(pathType, stream);
-  }
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {}
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/AddTriggerInTablePlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/AddTriggerInTablePlan.java
new file mode 100644
index 0000000000..fd658e4aae
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/AddTriggerInTablePlan.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.trigger;
+
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AddTriggerInTablePlan extends ConfigPhysicalPlan {
+
+  private TriggerInformation triggerInformation;
+  private Binary jarFile;
+
+  public AddTriggerInTablePlan() {
+    super(ConfigPhysicalPlanType.AddTriggerInTable);
+  }
+
+  public AddTriggerInTablePlan(TriggerInformation triggerInformation, Binary 
jarFile) {
+    super(ConfigPhysicalPlanType.AddTriggerInTable);
+    this.triggerInformation = triggerInformation;
+    this.jarFile = jarFile;
+  }
+
+  public TriggerInformation getTriggerInformation() {
+    return triggerInformation;
+  }
+
+  public void setTriggerInformation(TriggerInformation triggerInformation) {
+    this.triggerInformation = triggerInformation;
+  }
+
+  public Binary getJarFile() {
+    return jarFile;
+  }
+
+  public void setJarFile(Binary jarFile) {
+    this.jarFile = jarFile;
+  }
+
+  @Override
+  protected void serializeImpl(DataOutputStream stream) throws IOException {
+    stream.writeInt(getType().ordinal());
+
+    triggerInformation.serialize(stream);
+    if (jarFile == null) {
+      ReadWriteIOUtils.write(true, stream);
+    } else {
+      ReadWriteIOUtils.write(false, stream);
+      ReadWriteIOUtils.write(jarFile, stream);
+    }
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+    triggerInformation = TriggerInformation.deserialize(buffer);
+    if (ReadWriteIOUtils.readBool(buffer)) {
+      return;
+    }
+    jarFile = ReadWriteIOUtils.readBinary(buffer);
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/DeleteTriggerInTablePlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/DeleteTriggerInTablePlan.java
new file mode 100644
index 0000000000..c91218f86a
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/DeleteTriggerInTablePlan.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.trigger;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DeleteTriggerInTablePlan extends ConfigPhysicalPlan {
+
+  private String triggerName;
+
+  public DeleteTriggerInTablePlan() {
+    super(ConfigPhysicalPlanType.DeleteTriggerInTable);
+  }
+
+  public DeleteTriggerInTablePlan(String triggerName) {
+    super(ConfigPhysicalPlanType.DeleteTriggerInTable);
+    this.triggerName = triggerName;
+  }
+
+  public String getTriggerName() {
+    return triggerName;
+  }
+
+  public void setTriggerName(String triggerName) {
+    this.triggerName = triggerName;
+  }
+
+  @Override
+  protected void serializeImpl(DataOutputStream stream) throws IOException {
+    stream.writeInt(getType().ordinal());
+
+    ReadWriteIOUtils.write(triggerName, stream);
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+    triggerName = ReadWriteIOUtils.readString(buffer);
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerStateInTablePlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerStateInTablePlan.java
new file mode 100644
index 0000000000..448b69291f
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerStateInTablePlan.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.trigger;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class UpdateTriggerStateInTablePlan extends ConfigPhysicalPlan {
+
+  private String triggerName;
+  private TTriggerState triggerState;
+
+  public UpdateTriggerStateInTablePlan() {
+    super(ConfigPhysicalPlanType.UpdateTriggerStateInTable);
+  }
+
+  public UpdateTriggerStateInTablePlan(String triggerName, TTriggerState 
triggerState) {
+    super(ConfigPhysicalPlanType.UpdateTriggerStateInTable);
+    this.triggerName = triggerName;
+    this.triggerState = triggerState;
+  }
+
+  public String getTriggerName() {
+    return triggerName;
+  }
+
+  public void setTriggerName(String triggerName) {
+    this.triggerName = triggerName;
+  }
+
+  public TTriggerState getTriggerState() {
+    return triggerState;
+  }
+
+  public void setTriggerState(TTriggerState triggerState) {
+    this.triggerState = triggerState;
+  }
+
+  @Override
+  protected void serializeImpl(DataOutputStream stream) throws IOException {
+    stream.writeInt(getType().ordinal());
+
+    ReadWriteIOUtils.write(triggerName, stream);
+    ReadWriteIOUtils.write(triggerState.getValue(), stream);
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+    triggerName = ReadWriteIOUtils.readString(buffer);
+    triggerState = TTriggerState.findByValue(ReadWriteIOUtils.readInt(buffer));
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TriggerTableResp.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TriggerTableResp.java
new file mode 100644
index 0000000000..77305ac07f
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TriggerTableResp.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
+import org.apache.iotdb.consensus.common.DataSet;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TriggerTableResp implements DataSet {
+
+  private TSStatus status;
+
+  private List<TriggerInformation> allTriggerInformation;
+
+  public TriggerTableResp() {}
+
+  public TriggerTableResp(TSStatus status, List<TriggerInformation> 
allTriggerInformation) {
+    this.status = status;
+    this.allTriggerInformation = allTriggerInformation;
+  }
+
+  public TSStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(TSStatus status) {
+    this.status = status;
+  }
+
+  public List<TriggerInformation> getAllTriggerInformation() {
+    return allTriggerInformation;
+  }
+
+  public void setAllTriggerInformation(List<TriggerInformation> 
allTriggerInformation) {
+    this.allTriggerInformation = allTriggerInformation;
+  }
+
+  public TGetTriggerTableResp convertToThriftResponse() throws IOException {
+    List<ByteBuffer> triggerInformationByteBuffers = new ArrayList<>();
+
+    for (TriggerInformation triggerInformation : allTriggerInformation) {
+      triggerInformationByteBuffers.add(triggerInformation.serialize());
+    }
+
+    return new TGetTriggerTableResp(status, triggerInformationByteBuffers);
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 63be337df4..445d69fd04 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -76,6 +76,7 @@ import 
org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.persistence.AuthorInfo;
 import org.apache.iotdb.confignode.persistence.NodeInfo;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
+import org.apache.iotdb.confignode.persistence.TriggerInfo;
 import org.apache.iotdb.confignode.persistence.UDFInfo;
 import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
@@ -83,13 +84,16 @@ import 
org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
 import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -151,6 +155,8 @@ public class ConfigManager implements IManager {
   /** UDF */
   private final UDFManager udfManager;
 
+  /** Manage Trigger */
+  private final TriggerManager triggerManager;
   /** Sync */
   private final SyncManager syncManager;
 
@@ -162,6 +168,7 @@ public class ConfigManager implements IManager {
     AuthorInfo authorInfo = new AuthorInfo();
     ProcedureInfo procedureInfo = new ProcedureInfo();
     UDFInfo udfInfo = new UDFInfo();
+    TriggerInfo triggerInfo = new TriggerInfo();
     ClusterSyncInfo syncInfo = new ClusterSyncInfo();
 
     // Build state machine and executor
@@ -173,6 +180,7 @@ public class ConfigManager implements IManager {
             authorInfo,
             procedureInfo,
             udfInfo,
+            triggerInfo,
             syncInfo);
     PartitionRegionStateMachine stateMachine = new 
PartitionRegionStateMachine(this, executor);
 
@@ -183,6 +191,7 @@ public class ConfigManager implements IManager {
     this.permissionManager = new PermissionManager(this, authorInfo);
     this.procedureManager = new ProcedureManager(this, procedureInfo);
     this.udfManager = new UDFManager(this, udfInfo);
+    this.triggerManager = new TriggerManager(this, triggerInfo);
     this.loadManager = new LoadManager(this);
     this.syncManager = new SyncManager(this, syncInfo);
 
@@ -595,6 +604,11 @@ public class ConfigManager implements IManager {
     return loadManager;
   }
 
+  @Override
+  public TriggerManager getTriggerManager() {
+    return triggerManager;
+  }
+
   @Override
   public TSStatus operatePermission(AuthorPlan authorPlan) {
     TSStatus status = confirmLeader();
@@ -747,6 +761,30 @@ public class ConfigManager implements IManager {
         : status;
   }
 
+  @Override
+  public TSStatus createTrigger(TCreateTriggerReq req) {
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? triggerManager.createTrigger(req)
+        : status;
+  }
+
+  @Override
+  public TSStatus dropTrigger(TDropTriggerReq req) {
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? triggerManager.dropTrigger(req)
+        : status;
+  }
+
+  @Override
+  public TGetTriggerTableResp getTriggerTable() {
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? triggerManager.getTriggerTable()
+        : new TGetTriggerTableResp().setStatus(status);
+  }
+
   @Override
   public TSStatus merge() {
     TSStatus status = confirmLeader();
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 38908a89a7..64519e0770 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -45,13 +45,16 @@ import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -121,6 +124,13 @@ public interface IManager {
    */
   UDFManager getUDFManager();
 
+  /**
+   * Get TriggerManager
+   *
+   * @return TriggerManager instance
+   */
+  TriggerManager getTriggerManager();
+
   /**
    * Get ProcedureManager
    *
@@ -278,6 +288,15 @@ public interface IManager {
 
   TSStatus dropFunction(String udfName);
 
+  /** Create trigger */
+  TSStatus createTrigger(TCreateTriggerReq req);
+
+  /** Drop trigger */
+  TSStatus dropTrigger(TDropTriggerReq req);
+
+  /** Show trigger & DataNode start */
+  TGetTriggerTableResp getTriggerTable();
+
   /** Merge on all DataNodes */
   TSStatus merge();
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index ecc84cf8b4..6b9f3e819a 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.manager;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -35,6 +36,7 @@ import 
org.apache.iotdb.confignode.procedure.ProcedureExecutor;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
+import org.apache.iotdb.confignode.procedure.impl.CreateTriggerProcedure;
 import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
 import org.apache.iotdb.confignode.procedure.impl.DeleteTimeSeriesProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
@@ -52,6 +54,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Binary;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -222,6 +225,25 @@ public class ProcedureManager {
     }
   }
 
+  /**
+   * Generate CreateTriggerProcedure and wait for it finished
+   *
+   * @return SUCCESS_STATUS if trigger created successfully, 
CREATE_TRIGGER_ERROR otherwise
+   */
+  public TSStatus createTrigger(TriggerInformation triggerInformation, Binary 
jarFile) {
+    long procedureId =
+        executor.submitProcedure(new 
CreateTriggerProcedure(triggerInformation, jarFile));
+    List<TSStatus> statusList = new ArrayList<>();
+    boolean isSucceed =
+        waitingProcedureFinished(Collections.singletonList(procedureId), 
statusList);
+    if (isSucceed) {
+      return RpcUtils.SUCCESS_STATUS;
+    } else {
+      return new TSStatus(TSStatusCode.CREATE_TRIGGER_ERROR.getStatusCode())
+          .setMessage(statusList.get(0).getMessage());
+    }
+  }
+
   /**
    * Waiting until the specific procedures finished
    *
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
new file mode 100644
index 0000000000..cc329952d6
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
+import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
+import org.apache.iotdb.confignode.persistence.TriggerInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.trigger.api.enums.TriggerEvent;
+import org.apache.iotdb.trigger.api.enums.TriggerType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class TriggerManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TriggerManager.class);
+
+  private final ConfigManager configManager;
+  private final TriggerInfo triggerInfo;
+
+  public TriggerManager(ConfigManager configManager, TriggerInfo triggerInfo) {
+    this.configManager = configManager;
+    this.triggerInfo = triggerInfo;
+  }
+
+  public TriggerInfo getTriggerInfo() {
+    return triggerInfo;
+  }
+
+  /**
+   * Create a trigger in cluster.
+   *
+   * <p>If TriggerType is STATELESS, we should create TriggerInstance on all 
DataNodes, the
+   * DataNodeLocation in TriggerInformation will be null.
+   *
+   * <p>If TriggerType is STATEFUL, we should create TriggerInstance on the 
DataNode with the lowest
+   * load, and DataNodeLocation of this DataNode will be saved.
+   *
+   * <p>All DataNodes will add TriggerInformation of this trigger in local 
TriggerTable.
+   *
+   * @param req the createTrigger request
+   * @return status of create this trigger
+   */
+  public TSStatus createTrigger(TCreateTriggerReq req) {
+    boolean isStateful = TriggerType.construct(req.getTriggerType()) == 
TriggerType.STATEFUL;
+    TDataNodeLocation dataNodeLocation =
+        isStateful ? configManager.getNodeManager().getLowestLoadDataNode() : 
null;
+    TriggerInformation triggerInformation =
+        new TriggerInformation(
+            (PartialPath) PathDeserializeUtil.deserialize(req.pathPattern),
+            req.getTriggerName(),
+            req.getClassName(),
+            req.getJarPath(),
+            req.getAttributes(),
+            TriggerEvent.construct(req.triggerEvent),
+            TTriggerState.INACTIVE,
+            isStateful,
+            dataNodeLocation,
+            req.getJarMD5());
+    return configManager
+        .getProcedureManager()
+        .createTrigger(triggerInformation, new Binary(req.getJarFile()));
+  }
+
+  public TSStatus dropTrigger(TDropTriggerReq req) {
+    // TODO
+    return null;
+  }
+
+  public TGetTriggerTableResp getTriggerTable() {
+    try {
+      return ((TriggerTableResp)
+              configManager.getConsensusManager().read(new 
GetTriggerTablePlan()).getDataset())
+          .convertToThriftResponse();
+    } catch (IOException e) {
+      LOGGER.error("Fail to get TriggerTable", e);
+      return new TGetTriggerTableResp(
+          new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+              .setMessage(e.getMessage()),
+          Collections.emptyList());
+    }
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index f974a1de24..f997298e3d 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -85,6 +85,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
@@ -703,6 +704,29 @@ public class NodeManager {
     return result;
   }
 
+  /**
+   * Get the DataNodeLocation of the DataNode which has the lowest load
+   *
+   * @return TDataNodeLocation
+   */
+  public TDataNodeLocation getLowestLoadDataNode() {
+    AtomicInteger result = new AtomicInteger();
+    AtomicLong lowestLoadScore = new AtomicLong(Long.MAX_VALUE);
+
+    nodeCacheMap.forEach(
+        (dataNodeId, heartbeatCache) -> {
+          long score = heartbeatCache.getLoadScore();
+          if (score < lowestLoadScore.get()) {
+            result.set(dataNodeId);
+            lowestLoadScore.set(score);
+          }
+        });
+
+    LOGGER.info(
+        "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result, 
lowestLoadScore);
+    return 
configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
+  }
+
   public boolean isNodeRemoving(int dataNodeId) {
     DataNodeHeartbeatCache cache =
         (DataNodeHeartbeatCache) 
configManager.getNodeManager().getNodeCacheMap().get(dataNodeId);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
new file mode 100644
index 0000000000..e9f4fac2fd
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.commons.trigger.TriggerTable;
+import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
+import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TriggerInfo implements SnapshotProcessor {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TriggerInfo.class);
+
+  private static final ConfigNodeConfig CONFIG_NODE_CONF =
+      ConfigNodeDescriptor.getInstance().getConf();
+
+  private final TriggerTable triggerTable;
+  private final Map<String, String> existedJarToMD5;
+  // private final Map<String, AtomicInteger> jarReferenceTable;
+
+  private final TriggerExecutableManager triggerExecutableManager;
+
+  private final ReentrantLock triggerTableLock = new ReentrantLock();
+
+  private final String snapshotFileName = "trigger_info.bin";
+
+  public TriggerInfo() throws IOException {
+    triggerTable = new TriggerTable();
+    existedJarToMD5 = new HashMap<>();
+    // jarReferenceTable = new ConcurrentHashMap<>();
+    triggerExecutableManager =
+        TriggerExecutableManager.setupAndGetInstance(
+            CONFIG_NODE_CONF.getTemporaryLibDir(), 
CONFIG_NODE_CONF.getTriggerLibDir());
+  }
+
+  public void acquireTriggerTableLock() {
+    LOGGER.info("acquire TriggerTableLock");
+    triggerTableLock.lock();
+  }
+
+  public void releaseTriggerTableLock() {
+    LOGGER.info("release TriggerTableLock");
+    triggerTableLock.unlock();
+  }
+
+  @Override
+  public boolean processTakeSnapshot(File snapshotDir) throws TException, 
IOException {
+    // TODO implement when 'Drop Trigger' done
+    return true;
+  }
+
+  @Override
+  public void processLoadSnapshot(File snapshotDir) throws TException, 
IOException {
+    // TODO implement when 'Drop Trigger' done
+  }
+
+  /**
+   * Validate whether the trigger can be created
+   *
+   * @param triggerName
+   * @param jarName
+   * @param jarMD5
+   */
+  public void validate(String triggerName, String jarName, String jarMD5) {
+    if (triggerTable.containsTrigger(triggerName)) {
+      throw new TriggerManagementException(
+          String.format(
+              "Failed to create trigger [%s], the same name trigger has been 
created",
+              triggerName));
+    }
+
+    if (existedJarToMD5.containsKey(jarName) && 
!existedJarToMD5.get(jarName).equals(jarMD5)) {
+      throw new TriggerManagementException(
+          String.format(
+              "Failed to create trigger [%s], the same name Jar [%s] but 
different MD5 [%s] has existed",
+              triggerName, jarName, jarMD5));
+    }
+  }
+
+  public boolean needToSaveJar(String jarName) {
+    return !existedJarToMD5.containsKey(jarName);
+  }
+
+  public TSStatus addTriggerInTable(AddTriggerInTablePlan physicalPlan) {
+    try {
+      TriggerInformation triggerInformation = 
physicalPlan.getTriggerInformation();
+      triggerTable.addTriggerInformation(triggerInformation.getTriggerName(), 
triggerInformation);
+      existedJarToMD5.put(triggerInformation.getJarName(), 
triggerInformation.getJarFileMD5());
+      if (physicalPlan.getJarFile() != null) {
+        triggerExecutableManager.writeToLibDir(
+            ByteBuffer.wrap(physicalPlan.getJarFile().getValues()),
+            triggerInformation.getJarName());
+      }
+      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } catch (Exception e) {
+      final String errorMessage =
+          String.format(
+              "Failed to add trigger [%s] in TriggerTable on Config Nodes, 
because of %s",
+              physicalPlan.getTriggerInformation().getTriggerName(), e);
+      LOGGER.warn(errorMessage, e);
+      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+          .setMessage(errorMessage);
+    }
+  }
+
+  public TSStatus deleteTriggerInTable(DeleteTriggerInTablePlan physicalPlan) {
+    String triggerName = physicalPlan.getTriggerName();
+    if (triggerTable.containsTrigger(triggerName)) {
+      
existedJarToMD5.remove(triggerTable.getTriggerInformation(triggerName).getJarName());
+      triggerTable.deleteTriggerInformation(triggerName);
+    }
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  public TSStatus updateTriggerStateInTable(UpdateTriggerStateInTablePlan 
physicalPlan) {
+    triggerTable.setTriggerState(physicalPlan.getTriggerName(), 
physicalPlan.getTriggerState());
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  public TriggerTableResp getTriggerTable() {
+    return new TriggerTableResp(
+        new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+        triggerTable.getAllTriggerInformation());
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 6f0023c7bf..4fc66d6a81 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -61,11 +61,15 @@ import 
org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan
 import 
org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
 import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
 import 
org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
 import org.apache.iotdb.confignode.persistence.AuthorInfo;
 import org.apache.iotdb.confignode.persistence.NodeInfo;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
+import org.apache.iotdb.confignode.persistence.TriggerInfo;
 import org.apache.iotdb.confignode.persistence.UDFInfo;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
@@ -105,6 +109,7 @@ public class ConfigPlanExecutor {
 
   private final UDFInfo udfInfo;
 
+  private final TriggerInfo triggerInfo;
   private final ClusterSyncInfo syncInfo;
 
   public ConfigPlanExecutor(
@@ -114,6 +119,7 @@ public class ConfigPlanExecutor {
       AuthorInfo authorInfo,
       ProcedureInfo procedureInfo,
       UDFInfo udfInfo,
+      TriggerInfo triggerInfo,
       ClusterSyncInfo syncInfo) {
     this.nodeInfo = nodeInfo;
     this.clusterSchemaInfo = clusterSchemaInfo;
@@ -121,6 +127,7 @@ public class ConfigPlanExecutor {
     this.authorInfo = authorInfo;
     this.procedureInfo = procedureInfo;
     this.udfInfo = udfInfo;
+    this.triggerInfo = triggerInfo;
     this.syncInfo = syncInfo;
   }
 
@@ -163,6 +170,8 @@ public class ConfigPlanExecutor {
         return clusterSchemaInfo.getAllTemplateSetInfo();
       case GetPipeSink:
         return syncInfo.getPipeSink((GetPipeSinkPlan) req);
+      case GetTriggerTable:
+        return triggerInfo.getTriggerTable();
       default:
         throw new UnknownPhysicalPlanTypeException(req.getType());
     }
@@ -234,6 +243,12 @@ public class ConfigPlanExecutor {
         return udfInfo.createFunction((CreateFunctionPlan) physicalPlan);
       case DropFunction:
         return udfInfo.dropFunction((DropFunctionPlan) physicalPlan);
+      case AddTriggerInTable:
+        return triggerInfo.addTriggerInTable((AddTriggerInTablePlan) 
physicalPlan);
+      case DeleteTriggerInTable:
+        return triggerInfo.deleteTriggerInTable((DeleteTriggerInTablePlan) 
physicalPlan);
+      case UpdateTriggerStateInTable:
+        return 
triggerInfo.updateTriggerStateInTable((UpdateTriggerStateInTablePlan) 
physicalPlan);
       case CreateSchemaTemplate:
         return 
clusterSchemaInfo.createSchemaTemplate((CreateSchemaTemplatePlan) physicalPlan);
       case UpdateRegionLocation:
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 82dffe0031..f3778361a2 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import 
org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
@@ -49,16 +50,23 @@ import 
org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
+import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Binary;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -362,6 +370,80 @@ public class ConfigNodeProcedureEnv {
     return getPartitionManager().getAllReplicaSets(storageGroup);
   }
 
+  public List<TSStatus> createTriggerOnDataNodes(
+      TriggerInformation triggerInformation, Binary jarFile) throws 
IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new 
ArrayList<>(dataNodeLocationMap.size()));
+    final TCreateTriggerInstanceReq request =
+        new TCreateTriggerInstanceReq(
+            triggerInformation.serialize(), 
ByteBuffer.wrap(jarFile.getValues()));
+    // TODO: The request sent to DataNodes which stateful triggerInstance 
needn't to be created
+    // don't set
+    // JarFile
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(
+            request,
+            dataNodeLocationMap,
+            DataNodeRequestType.CREATE_TRIGGER_INSTANCE,
+            dataNodeResponseStatus);
+    return dataNodeResponseStatus;
+  }
+
+  public List<TSStatus> dropTriggerOnDataNodes(TriggerInformation 
triggerInformation)
+      throws IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new 
ArrayList<>(dataNodeLocationMap.size()));
+    final TDropTriggerInstanceReq request =
+        new TDropTriggerInstanceReq(triggerInformation.getTriggerName(), 
false);
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(
+            request,
+            dataNodeLocationMap,
+            DataNodeRequestType.DROP_TRIGGER_INSTANCE,
+            dataNodeResponseStatus);
+    return dataNodeResponseStatus;
+  }
+
+  public List<TSStatus> activeTriggerOnDataNodes(String triggerName) throws 
IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new 
ArrayList<>(dataNodeLocationMap.size()));
+    final TActiveTriggerInstanceReq request = new 
TActiveTriggerInstanceReq(triggerName);
+
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(
+            request,
+            dataNodeLocationMap,
+            DataNodeRequestType.ACTIVE_TRIGGER_INSTANCE,
+            dataNodeResponseStatus);
+    return dataNodeResponseStatus;
+  }
+
+  public List<TSStatus> inactiveTriggerOnDataNodes(String triggerName) throws 
IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new 
ArrayList<>(dataNodeLocationMap.size()));
+    final TInactiveTriggerInstanceReq request = new 
TInactiveTriggerInstanceReq(triggerName);
+
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(
+            request,
+            dataNodeLocationMap,
+            DataNodeRequestType.INACTIVE_TRIGGER_INSTANCE,
+            dataNodeResponseStatus);
+    return dataNodeResponseStatus;
+  }
+
   public LockQueue getNodeLock() {
     return nodeLock;
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
new file mode 100644
index 0000000000..85bdfaa021
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.persistence.TriggerInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.CreateTriggerState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** remove config node procedure */
+public class CreateTriggerProcedure extends 
AbstractNodeProcedure<CreateTriggerState> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CreateTriggerProcedure.class);
+  private static final int retryThreshold = 5;
+
+  private TriggerInformation triggerInformation;
+  private Binary jarFile;
+
+  public CreateTriggerProcedure() {
+    super();
+  }
+
+  public CreateTriggerProcedure(TriggerInformation triggerInformation, Binary 
jarFile) {
+    super();
+    this.triggerInformation = triggerInformation;
+    this.jarFile = jarFile;
+  }
+
+  @Override
+  protected Flow executeFromState(ConfigNodeProcedureEnv env, 
CreateTriggerState state) {
+    if (triggerInformation == null) {
+      return Flow.NO_MORE_STATE;
+    }
+    try {
+      switch (state) {
+        case INIT:
+          LOG.info("Start to create trigger [{}]", 
triggerInformation.getTriggerName());
+
+          TriggerInfo triggerInfo = 
env.getConfigManager().getTriggerManager().getTriggerInfo();
+          triggerInfo.acquireTriggerTableLock();
+          triggerInfo.validate(
+              triggerInformation.getTriggerName(),
+              triggerInformation.getJarName(),
+              triggerInformation.getJarFileMD5());
+          setNextState(CreateTriggerState.VALIDATED);
+          break;
+
+        case VALIDATED:
+          ConfigManager configManager = env.getConfigManager();
+          boolean needToSaveJar =
+              configManager
+                  .getTriggerManager()
+                  .getTriggerInfo()
+                  .needToSaveJar(triggerInformation.getJarName());
+
+          LOG.info(
+              "Start to add trigger [{}] in TriggerTable on Config Nodes, 
needToSaveJar[{}]",
+              triggerInformation.getTriggerName(),
+              needToSaveJar);
+
+          ConsensusWriteResponse response =
+              configManager
+                  .getConsensusManager()
+                  .write(
+                      new AddTriggerInTablePlan(
+                          triggerInformation, needToSaveJar ? jarFile : null));
+          if (!response.isSuccessful()) {
+            throw new TriggerManagementException(response.getErrorMessage());
+          }
+
+          setNextState(CreateTriggerState.CONFIG_NODE_INACTIVE);
+          break;
+
+        case CONFIG_NODE_INACTIVE:
+          LOG.info(
+              "Start to create triggerInstance [{}] on Data Nodes",
+              triggerInformation.getTriggerName());
+
+          if (RpcUtils.squashResponseStatusList(
+                      env.createTriggerOnDataNodes(triggerInformation, 
jarFile))
+                  .getCode()
+              == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            setNextState(CreateTriggerState.DATA_NODE_INACTIVE);
+          } else {
+            throw new TriggerManagementException(
+                String.format(
+                    "Fail to create triggerInstance [%s] on Data Nodes",
+                    triggerInformation.getTriggerName()));
+          }
+          break;
+
+        case DATA_NODE_INACTIVE:
+          LOG.info(
+              "Start to active trigger [{}] on Data Nodes", 
triggerInformation.getTriggerName());
+
+          if (RpcUtils.squashResponseStatusList(
+                      
env.activeTriggerOnDataNodes(triggerInformation.getTriggerName()))
+                  .getCode()
+              == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            setNextState(CreateTriggerState.DATA_NODE_ACTIVE);
+          } else {
+            throw new TriggerManagementException(
+                String.format(
+                    "Fail to active triggerInstance [%s] on Data Nodes",
+                    triggerInformation.getTriggerName()));
+          }
+          break;
+
+        case DATA_NODE_ACTIVE:
+          LOG.info(
+              "Start to active trigger [{}] on Config Nodes", 
triggerInformation.getTriggerName());
+          env.getConfigManager()
+              .getConsensusManager()
+              .write(
+                  new UpdateTriggerStateInTablePlan(
+                      triggerInformation.getTriggerName(), 
TTriggerState.ACTIVE));
+          setNextState(CreateTriggerState.CONFIG_NODE_ACTIVE);
+          break;
+
+        case CONFIG_NODE_ACTIVE:
+          
env.getConfigManager().getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
+          return Flow.NO_MORE_STATE;
+      }
+    } catch (Exception e) {
+      if (isRollbackSupported(state)) {
+        LOG.error("Fail in CreateTriggerProcedure", e);
+        setFailure(new ProcedureException(e.getMessage()));
+      } else {
+        LOG.error(
+            "Retrievable error trying to create trigger [{}], state [{}]",
+            triggerInformation.getTriggerName(),
+            state,
+            e);
+        if (getCycles() > retryThreshold) {
+          setFailure(
+              new ProcedureException(
+                  String.format(
+                      "Fail to create trigger [%s] at STATE [%s]",
+                      triggerInformation.getTriggerName(), state)));
+        }
+      }
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(ConfigNodeProcedureEnv env, CreateTriggerState 
state)
+      throws IOException, InterruptedException, ProcedureException {
+    switch (state) {
+      case INIT:
+        LOG.info("Start [INIT] rollback of trigger [{}]", 
triggerInformation.getTriggerName());
+
+        
env.getConfigManager().getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
+        break;
+
+      case VALIDATED:
+        LOG.info("Start [VALIDATED] rollback of trigger [{}]", 
triggerInformation.getTriggerName());
+
+        env.getConfigManager()
+            .getConsensusManager()
+            .write(new 
DeleteTriggerInTablePlan(triggerInformation.getTriggerName()));
+        break;
+
+      case CONFIG_NODE_INACTIVE:
+        LOG.info(
+            "Start to [CONFIG_NODE_INACTIVE] rollback of trigger [{}]",
+            triggerInformation.getTriggerName());
+
+        if 
(RpcUtils.squashResponseStatusList(env.dropTriggerOnDataNodes(triggerInformation))
+                .getCode()
+            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        } else {
+          throw new TriggerManagementException(
+              String.format(
+                  "Fail to [CONFIG_NODE_INACTIVE] rollback of trigger [{}]",
+                  triggerInformation.getTriggerName()));
+        }
+        break;
+
+      case DATA_NODE_INACTIVE:
+        LOG.info(
+            "Start to [DATA_NODE_INACTIVE] rollback of trigger [{}]",
+            triggerInformation.getTriggerName());
+
+        if (RpcUtils.squashResponseStatusList(
+                    
env.inactiveTriggerOnDataNodes(triggerInformation.getTriggerName()))
+                .getCode()
+            != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          throw new TriggerManagementException(
+              String.format(
+                  "Fail to [DATA_NODE_INACTIVE] rollback of trigger [%s]",
+                  triggerInformation.getTriggerName()));
+        }
+        break;
+
+      default:
+        break;
+    }
+  }
+
+  @Override
+  protected boolean isRollbackSupported(CreateTriggerState state) {
+    return true;
+  }
+
+  @Override
+  protected CreateTriggerState getState(int stateId) {
+    return CreateTriggerState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(CreateTriggerState createTriggerState) {
+    return createTriggerState.ordinal();
+  }
+
+  @Override
+  protected CreateTriggerState getInitialState() {
+    return CreateTriggerState.INIT;
+  }
+
+  public Binary getJarFile() {
+    return jarFile;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    
stream.writeInt(ProcedureFactory.ProcedureType.CREATE_TRIGGER_PROCEDURE.ordinal());
+    super.serialize(stream);
+    triggerInformation.serialize(stream);
+    if (jarFile == null) {
+      ReadWriteIOUtils.write(true, stream);
+    } else {
+      ReadWriteIOUtils.write(false, stream);
+      ReadWriteIOUtils.write(jarFile, stream);
+    }
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    triggerInformation = TriggerInformation.deserialize(byteBuffer);
+    if (ReadWriteIOUtils.readBool(byteBuffer)) {
+      return;
+    }
+    jarFile = ReadWriteIOUtils.readBinary(byteBuffer);
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof CreateTriggerProcedure) {
+      CreateTriggerProcedure thatProc = (CreateTriggerProcedure) that;
+      return thatProc.getProcId() == this.getProcId()
+          && thatProc.getState() == this.getState()
+          && thatProc.triggerInformation.equals(this.triggerInformation);
+    }
+    return false;
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateTriggerState.java
similarity index 68%
copy from 
node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
copy to 
confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateTriggerState.java
index 82705274bf..1ab427b403 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateTriggerState.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -17,14 +17,13 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.trigger.exception;
+package org.apache.iotdb.confignode.procedure.state;
 
-public class TriggerJarToLargeException extends RuntimeException {
-  public TriggerJarToLargeException(String message) {
-    super(message);
-  }
-
-  public TriggerJarToLargeException(String message, Throwable cause) {
-    super(message, cause);
-  }
+public enum CreateTriggerState {
+  INIT,
+  VALIDATED,
+  CONFIG_NODE_INACTIVE,
+  DATA_NODE_INACTIVE,
+  DATA_NODE_ACTIVE,
+  CONFIG_NODE_ACTIVE
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index fff4260307..7101f1e7f5 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure.store;
 import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
+import org.apache.iotdb.confignode.procedure.impl.CreateTriggerProcedure;
 import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
 import org.apache.iotdb.confignode.procedure.impl.DeleteTimeSeriesProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
@@ -69,6 +70,9 @@ public class ProcedureFactory implements IProcedureFactory {
       case DELETE_TIMESERIES_PROCEDURE:
         procedure = new DeleteTimeSeriesProcedure();
         break;
+      case CREATE_TRIGGER_PROCEDURE:
+        procedure = new CreateTriggerProcedure();
+        break;
       default:
         LOGGER.error("unknown Procedure type: " + typeNum);
         throw new IOException("unknown Procedure type: " + typeNum);
@@ -92,6 +96,8 @@ public class ProcedureFactory implements IProcedureFactory {
       return ProcedureType.CREATE_REGION_GROUPS;
     } else if (procedure instanceof DeleteTimeSeriesProcedure) {
       return ProcedureType.DELETE_TIMESERIES_PROCEDURE;
+    } else if (procedure instanceof CreateTriggerProcedure) {
+      return ProcedureType.CREATE_TRIGGER_PROCEDURE;
     }
     return null;
   }
@@ -103,7 +109,8 @@ public class ProcedureFactory implements IProcedureFactory {
     REMOVE_DATA_NODE_PROCEDURE,
     REGION_MIGRATE_PROCEDURE,
     CREATE_REGION_GROUPS,
-    DELETE_TIMESERIES_PROCEDURE
+    DELETE_TIMESERIES_PROCEDURE,
+    CREATE_TRIGGER_PROCEDURE
   }
 
   private static class ProcedureFactoryHolder {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index d252e90b49..d71919e555 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -481,20 +481,16 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
 
   @Override
   public TSStatus createTrigger(TCreateTriggerReq req) throws TException {
-    // todo : implementation
-    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    return configManager.createTrigger(req);
   }
 
   @Override
   public TSStatus dropTrigger(TDropTriggerReq req) throws TException {
-    // todo : implementation
-    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    return configManager.dropTrigger(req);
   }
 
   public TGetTriggerTableResp getTriggerTable() throws TException {
-    // todo: implementation
-    return new TGetTriggerTableResp(
-        new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), null);
+    return configManager.getTriggerTable();
   }
 
   @Override
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 7de5442d30..054bf92aac 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -34,6 +34,8 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurationPlan;
@@ -43,6 +45,7 @@ import 
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
 import 
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.template.GetAllTemplateSetInfoPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
@@ -69,6 +72,9 @@ import 
org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan
 import 
org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
 import org.apache.iotdb.confignode.persistence.partition.RegionCreateTask;
 import org.apache.iotdb.confignode.persistence.partition.RegionDeleteTask;
 import org.apache.iotdb.confignode.procedure.Procedure;
@@ -77,11 +83,14 @@ import 
org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
 import org.apache.iotdb.db.metadata.template.Template;
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
+import org.apache.iotdb.trigger.api.enums.TriggerEvent;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.junit.Assert;
@@ -873,4 +882,63 @@ public class ConfigPhysicalPlanSerDeTest {
         getPipeSinkPlanWithNullName.getPipeSinkName(),
         getPipeSinkPlanWithNullName1.getPipeSinkName());
   }
+
+  @Test
+  public void GetTriggerTablePlan() throws IOException {
+    GetTriggerTablePlan getTriggerTablePlan0 = new GetTriggerTablePlan();
+    Assert.assertTrue(
+        
ConfigPhysicalPlan.Factory.create(getTriggerTablePlan0.serializeToByteBuffer())
+            instanceof GetTriggerTablePlan);
+  }
+
+  @Test
+  public void AddTriggerInTablePlanTest() throws IOException, 
IllegalPathException {
+    TriggerInformation triggerInformation =
+        new TriggerInformation(
+            new PartialPath("root.test.**"),
+            "test",
+            "test.class",
+            "test.jar",
+            null,
+            TriggerEvent.AFTER_INSERT,
+            TTriggerState.INACTIVE,
+            false,
+            null,
+            "testMD5test");
+    AddTriggerInTablePlan addTriggerInTablePlan0 =
+        new AddTriggerInTablePlan(triggerInformation, new Binary(new byte[] 
{1, 2, 3}));
+    AddTriggerInTablePlan addTriggerInTablePlan1 =
+        (AddTriggerInTablePlan)
+            
ConfigPhysicalPlan.Factory.create(addTriggerInTablePlan0.serializeToByteBuffer());
+    Assert.assertEquals(
+        addTriggerInTablePlan0.getTriggerInformation(),
+        addTriggerInTablePlan1.getTriggerInformation());
+    Assert.assertEquals(addTriggerInTablePlan0.getJarFile(), 
addTriggerInTablePlan1.getJarFile());
+  }
+
+  @Test
+  public void DeleteTriggerInTablePlanTest() throws IOException {
+    DeleteTriggerInTablePlan deleteTriggerInTablePlan0 = new 
DeleteTriggerInTablePlan("test");
+    DeleteTriggerInTablePlan deleteTriggerInTablePlan1 =
+        (DeleteTriggerInTablePlan)
+            
ConfigPhysicalPlan.Factory.create(deleteTriggerInTablePlan0.serializeToByteBuffer());
+    Assert.assertEquals(
+        deleteTriggerInTablePlan0.getTriggerName(), 
deleteTriggerInTablePlan1.getTriggerName());
+  }
+
+  @Test
+  public void UpdateTriggerStateInTablePlanTest() throws IOException {
+    UpdateTriggerStateInTablePlan updateTriggerStateInTablePlan0 =
+        new UpdateTriggerStateInTablePlan("test", TTriggerState.ACTIVE);
+    UpdateTriggerStateInTablePlan updateTriggerStateInTablePlan1 =
+        (UpdateTriggerStateInTablePlan)
+            ConfigPhysicalPlan.Factory.create(
+                updateTriggerStateInTablePlan0.serializeToByteBuffer());
+    Assert.assertEquals(
+        updateTriggerStateInTablePlan0.getTriggerName(),
+        updateTriggerStateInTablePlan1.getTriggerName());
+    Assert.assertEquals(
+        updateTriggerStateInTablePlan0.getTriggerState(),
+        updateTriggerStateInTablePlan1.getTriggerState());
+  }
 }
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedureTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedureTest.java
new file mode 100644
index 0000000000..6481f6ba60
--- /dev/null
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedureTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.trigger.api.enums.TriggerEvent;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class CreateTriggerProcedureTest {
+
+  @Test
+  public void serializeDeserializeTest() throws IllegalPathException {
+
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+
+    TriggerInformation triggerInformation =
+        new TriggerInformation(
+            new PartialPath("root.test.**"),
+            "test",
+            "test.class",
+            "test.jar",
+            null,
+            TriggerEvent.AFTER_INSERT,
+            TTriggerState.INACTIVE,
+            false,
+            null,
+            "testMD5test");
+    CreateTriggerProcedure p1 =
+        new CreateTriggerProcedure(triggerInformation, new Binary(new byte[] 
{1, 2, 3}));
+
+    try {
+      p1.serialize(outputStream);
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+
+      CreateTriggerProcedure p2 =
+          (CreateTriggerProcedure) 
ProcedureFactory.getInstance().create(buffer);
+      assertEquals(p1, p2);
+      assertEquals(p1.getJarFile(), p2.getJarFile());
+
+    } catch (Exception e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void serializeDeserializeWithoutJarFileTest() throws 
IllegalPathException {
+
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+
+    TriggerInformation triggerInformation =
+        new TriggerInformation(
+            new PartialPath("root.test.**"),
+            "test",
+            "test.class",
+            "test.jar",
+            null,
+            TriggerEvent.AFTER_INSERT,
+            TTriggerState.INACTIVE,
+            false,
+            null,
+            "testMD5test");
+    CreateTriggerProcedure p1 = new CreateTriggerProcedure(triggerInformation, 
null);
+
+    try {
+      p1.serialize(outputStream);
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+
+      CreateTriggerProcedure p2 =
+          (CreateTriggerProcedure) 
ProcedureFactory.getInstance().create(buffer);
+      assertEquals(p1, p2);
+      assertEquals(p1.getJarFile(), p2.getJarFile());
+
+    } catch (Exception e) {
+      fail();
+    }
+  }
+}
diff --git 
a/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties 
b/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
index 888d0a6ec1..a2a70d72da 100644
--- a/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
+++ b/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
@@ -29,4 +29,5 @@ data_replication_factor=3
 system_dir=target/confignode1/system
 data_dirs=target/confignode1/data
 consensus_dir=target/confignode1/consensus
+trigger_lib_dir=target/confignode1/ext/trigger
 proc_wal_dir=target/confignode1/proc
\ No newline at end of file
diff --git 
a/confignode/src/test/resources/confignode2conf/iotdb-confignode.properties 
b/confignode/src/test/resources/confignode2conf/iotdb-confignode.properties
index 66c6045d59..b4a607642b 100644
--- a/confignode/src/test/resources/confignode2conf/iotdb-confignode.properties
+++ b/confignode/src/test/resources/confignode2conf/iotdb-confignode.properties
@@ -28,4 +28,5 @@ data_replication_factor=3
 system_dir=target/confignode2/system
 data_dirs=target/confignode2/data
 consensus_dir=target/confignode2/consensus
+trigger_lib_dir=target/confignode2/ext/trigger
 proc_wal_dir=target/confignode2/proc
\ No newline at end of file
diff --git 
a/confignode/src/test/resources/confignode3conf/iotdb-confignode.properties 
b/confignode/src/test/resources/confignode3conf/iotdb-confignode.properties
index 0d53e4a6dd..6fca1c3ad7 100644
--- a/confignode/src/test/resources/confignode3conf/iotdb-confignode.properties
+++ b/confignode/src/test/resources/confignode3conf/iotdb-confignode.properties
@@ -28,4 +28,5 @@ data_replication_factor=3
 system_dir=target/confignode3/system
 data_dirs=target/confignode3/data
 consensus_dir=target/confignode3/consensus
+trigger_lib_dir=target/confignode3/ext/trigger
 proc_wal_dir=target/confignode3/proc
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
 b/example/trigger/src/main/java/org/apache/iotdb/trigger/SimpleTrigger.java
similarity index 57%
copy from 
node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
copy to 
example/trigger/src/main/java/org/apache/iotdb/trigger/SimpleTrigger.java
index 82705274bf..a3c797d41e 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
+++ b/example/trigger/src/main/java/org/apache/iotdb/trigger/SimpleTrigger.java
@@ -16,15 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.trigger;
 
-package org.apache.iotdb.commons.trigger.exception;
+import org.apache.iotdb.trigger.api.Trigger;
+import org.apache.iotdb.tsfile.write.record.Tablet;
 
-public class TriggerJarToLargeException extends RuntimeException {
-  public TriggerJarToLargeException(String message) {
-    super(message);
-  }
+import java.util.Arrays;
+
+public class SimpleTrigger implements Trigger {
 
-  public TriggerJarToLargeException(String message, Throwable cause) {
-    super(message, cause);
+  @Override
+  public boolean fire(Tablet tablet) {
+    System.out.println("receive a tablet, device name is " + tablet.deviceId);
+    System.out.println("measurements are: ");
+    tablet
+        .getSchemas()
+        .forEach(measurementSchema -> 
System.out.println(measurementSchema.getMeasurementId()));
+    System.out.println("time are: " + Arrays.toString(tablet.timestamps));
+    return true;
   }
 }
diff --git 
a/example/trigger/src/main/java/org/apache/iotdb/trigger/AlertingExample.java 
b/example/trigger/src/main/java/org/apache/iotdb/trigger/old/AlertingExample.java
similarity index 99%
rename from 
example/trigger/src/main/java/org/apache/iotdb/trigger/AlertingExample.java
rename to 
example/trigger/src/main/java/org/apache/iotdb/trigger/old/AlertingExample.java
index c9bb2b663d..ec7fe2bce3 100644
--- 
a/example/trigger/src/main/java/org/apache/iotdb/trigger/AlertingExample.java
+++ 
b/example/trigger/src/main/java/org/apache/iotdb/trigger/old/AlertingExample.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.trigger;
+package org.apache.iotdb.trigger.old;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.trigger.api.Trigger;
diff --git 
a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java 
b/example/trigger/src/main/java/org/apache/iotdb/trigger/old/TriggerExample.java
similarity index 99%
rename from 
example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
rename to 
example/trigger/src/main/java/org/apache/iotdb/trigger/old/TriggerExample.java
index a41346ecc3..6bfe230bd3 100644
--- a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
+++ 
b/example/trigger/src/main/java/org/apache/iotdb/trigger/old/TriggerExample.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.trigger;
+package org.apache.iotdb.trigger.old;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.trigger.api.Trigger;
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index 93e81cb656..eb43c9a92d 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.commons.executable;
 
-import org.apache.iotdb.commons.trigger.exception.TriggerJarToLargeException;
+import org.apache.iotdb.commons.trigger.exception.TriggerJarTooLargeException;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 
 import org.apache.commons.io.FileUtils;
@@ -196,7 +196,7 @@ public class ExecutableManager {
       long size = fileChannel.size();
       if (size > Integer.MAX_VALUE) {
         // Max length of Thrift Binary is Integer.MAX_VALUE bytes.
-        throw new TriggerJarToLargeException(
+        throw new TriggerJarTooLargeException(
             String.format("Size of file exceed %d bytes", Integer.MAX_VALUE));
       }
       ByteBuffer byteBuffer = ByteBuffer.allocate((int) size);
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
index 8ef66a4ac8..de2a16c4ad 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
@@ -31,8 +31,8 @@ import 
org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -275,7 +275,7 @@ public class AlignedPath extends PartialPath {
   }
 
   @Override
-  public void serialize(DataOutputStream stream) throws IOException {
+  public void serialize(OutputStream stream) throws IOException {
     PathType.Aligned.serialize(stream);
     super.serializeWithoutType(stream);
     ReadWriteIOUtils.write(measurementList.size(), stream);
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
index 0dcbdb66b6..ec57075d68 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 
@@ -193,7 +194,7 @@ public class MeasurementPath extends PartialPath {
   }
 
   @Override
-  public void serialize(DataOutputStream stream) throws IOException {
+  public void serialize(OutputStream stream) throws IOException {
     PathType.Measurement.serialize(stream);
     super.serializeWithoutType(stream);
     if (measurementSchema == null) {
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index 3122741e43..6d2bde0808 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -33,8 +33,8 @@ import org.apache.commons.lang3.Validate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -676,22 +676,21 @@ public class PartialPath extends Path implements 
Comparable<Path>, Cloneable {
   }
 
   public ByteBuffer serialize() throws IOException {
-    PublicBAOS byteArrayOutputStream = new PublicBAOS();
-    DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
-    serialize(outputStream);
-    return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    PublicBAOS publicBAOS = new PublicBAOS();
+    serialize((OutputStream) publicBAOS);
+    return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
   }
 
   @Override
-  public void serialize(ByteBuffer byteBuffer) {
-    PathType.Partial.serialize(byteBuffer);
-    serializeWithoutType(byteBuffer);
+  public void serialize(OutputStream stream) throws IOException {
+    PathType.Partial.serialize(stream);
+    serializeWithoutType(stream);
   }
 
   @Override
-  public void serialize(DataOutputStream stream) throws IOException {
-    PathType.Partial.serialize(stream);
-    serializeWithoutType(stream);
+  public void serialize(ByteBuffer byteBuffer) {
+    PathType.Partial.serialize(byteBuffer);
+    serializeWithoutType(byteBuffer);
   }
 
   @Override
@@ -710,7 +709,7 @@ public class PartialPath extends Path implements 
Comparable<Path>, Cloneable {
   }
 
   @Override
-  protected void serializeWithoutType(DataOutputStream stream) throws 
IOException {
+  protected void serializeWithoutType(OutputStream stream) throws IOException {
     super.serializeWithoutType(stream);
     ReadWriteIOUtils.write(nodes.length, stream);
     for (String node : nodes) {
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/path/PathType.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/path/PathType.java
index 832667fa91..6513fa6d24 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PathType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/PathType.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.commons.path;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 public enum PathType {
@@ -41,7 +41,7 @@ public enum PathType {
     ReadWriteIOUtils.write(pathType, buffer);
   }
 
-  public void serialize(DataOutputStream stream) throws IOException {
+  public void serialize(OutputStream stream) throws IOException {
     ReadWriteIOUtils.write(pathType, stream);
   }
 
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
index 0d3649ff6c..119321d143 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.commons.trigger;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathDeserializeUtil;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
 import org.apache.iotdb.trigger.api.enums.TriggerEvent;
@@ -30,6 +31,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.Objects;
 
 /** This Class used to save the specific information of one Trigger. */
 public class TriggerInformation {
@@ -101,7 +103,7 @@ public class TriggerInformation {
 
   public static TriggerInformation deserialize(ByteBuffer byteBuffer) {
     TriggerInformation triggerInformation = new TriggerInformation();
-    triggerInformation.pathPattern = PartialPath.deserialize(byteBuffer);
+    triggerInformation.pathPattern = (PartialPath) 
PathDeserializeUtil.deserialize(byteBuffer);
     triggerInformation.triggerName = ReadWriteIOUtils.readString(byteBuffer);
     triggerInformation.className = ReadWriteIOUtils.readString(byteBuffer);
     triggerInformation.jarName = ReadWriteIOUtils.readString(byteBuffer);
@@ -119,6 +121,28 @@ public class TriggerInformation {
     return triggerInformation;
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    TriggerInformation that = (TriggerInformation) o;
+    return Objects.equals(triggerName, that.triggerName)
+        && Objects.equals(pathPattern, that.pathPattern)
+        && isStateful == that.isStateful
+        && Objects.equals(className, that.className)
+        && Objects.equals(jarName, that.jarName)
+        && Objects.equals(attributes, that.attributes)
+        && event == that.event
+        && triggerState == that.triggerState
+        && (isStateful() ? Objects.equals(dataNodeLocation, 
that.dataNodeLocation) : true)
+        && Objects.equals(jarFileMD5, that.jarFileMD5);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(triggerName);
+  }
+
   public PartialPath getPathPattern() {
     return pathPattern;
   }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
index a81d511b79..75f97417bb 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
@@ -22,8 +22,11 @@ import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /** This Class used to save the information of Triggers and implements methods 
of manipulate it. */
 @NotThreadSafe
@@ -31,7 +34,7 @@ public class TriggerTable {
   private final Map<String, TriggerInformation> triggerTable;
 
   public TriggerTable() {
-    triggerTable = new HashMap<>();
+    triggerTable = new ConcurrentHashMap<>();
   }
 
   public TriggerTable(Map<String, TriggerInformation> triggerTable) {
@@ -74,11 +77,15 @@ public class TriggerTable {
     return allTriggerStates;
   }
   // for getTriggerTable
-  public Map<String, TriggerInformation> getTable() {
-    return triggerTable;
+  public List<TriggerInformation> getAllTriggerInformation() {
+    return new ArrayList<>(triggerTable.values());
   }
 
   public boolean isEmpty() {
     return triggerTable.isEmpty();
   }
+
+  public Map<String, TriggerInformation> getTable() {
+    return triggerTable;
+  }
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarTooLargeException.java
similarity index 82%
rename from 
node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
rename to 
node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarTooLargeException.java
index 82705274bf..39674a7f79 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarTooLargeException.java
@@ -19,12 +19,12 @@
 
 package org.apache.iotdb.commons.trigger.exception;
 
-public class TriggerJarToLargeException extends RuntimeException {
-  public TriggerJarToLargeException(String message) {
+public class TriggerJarTooLargeException extends RuntimeException {
+  public TriggerJarTooLargeException(String message) {
     super(message);
   }
 
-  public TriggerJarToLargeException(String message, Throwable cause) {
+  public TriggerJarTooLargeException(String message, Throwable cause) {
     super(message, cause);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java 
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index f30a46914d..ee1acf70fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -878,7 +878,7 @@ public class ConfigNodeClient
 
   @Override
   public TSStatus createTrigger(TCreateTriggerReq req) throws TException {
-    for (int i = 0; i < RETRY_NUM; i++) {
+    for (int i = 0; i < 5; i++) {
       try {
         TSStatus status = client.createTrigger(req);
         if (!updateConfigNodeLeader(status)) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index ee9dde0b24..d91eb57ca7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -77,12 +77,12 @@ public class ColumnHeaderConstant {
 
   // column names for show triggers statement
   public static final String COLUMN_TRIGGER_NAME = "Trigger Name";
-  public static final String COLUMN_TRIGGER_EVENT = "Trigger Event";
-  public static final String COLUMN_TRIGGER_TYPE = "Trigger Type";
-  public static final String COLUMN_TRIGGER_STATE = "Trigger STATE";
-  public static final String COLUMN_TRIGGER_PATTERN = "Trigger PathPattern";
-  public static final String COLUMN_TRIGGER_CLASSNAME = "Trigger ClassName";
-  public static final String COLUMN_TRIGGER_LOCATION = "Trigger Location";
+  public static final String COLUMN_TRIGGER_EVENT = "Event";
+  public static final String COLUMN_TRIGGER_TYPE = "Type";
+  public static final String COLUMN_TRIGGER_STATE = "STATE";
+  public static final String COLUMN_TRIGGER_PATTERN = "PathPattern";
+  public static final String COLUMN_TRIGGER_CLASSNAME = "ClassName";
+  public static final String COLUMN_TRIGGER_LOCATION = "Node ID";
 
   // column names for show region statement
   public static final String COLUMN_REGION_ID = "RegionId";
@@ -252,6 +252,7 @@ public class ColumnHeaderConstant {
           new ColumnHeader(COLUMN_TRIGGER_TYPE, TSDataType.TEXT),
           new ColumnHeader(COLUMN_TRIGGER_STATE, TSDataType.TEXT),
           new ColumnHeader(COLUMN_TRIGGER_PATTERN, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_TRIGGER_CLASSNAME, TSDataType.TEXT),
           new ColumnHeader(COLUMN_TRIGGER_LOCATION, TSDataType.TEXT));
 
   public static final List<ColumnHeader> showSchemaTemplateHeaders =
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index c562b9e5ab..33db99d1dc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.commons.executable.ExecutableManager;
 import org.apache.iotdb.commons.executable.ExecutableResource;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.commons.trigger.TriggerTable;
 import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
@@ -429,11 +428,12 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                 getTriggerTableResp.getStatus().message, 
getTriggerTableResp.getStatus().code));
         return future;
       }
+      // convert triggerTable and buildTsBlock
+      
ShowTriggersTask.buildTsBlock(getTriggerTableResp.getAllTriggerInformation(), 
future);
     } catch (TException | IOException e) {
       future.setException(e);
     }
-    // convert triggerTable and buildTsBlock
-    ShowTriggersTask.buildTsBlock(new TriggerTable(), future);
+
     return future;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowTriggersTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowTriggersTask.java
index 3bc191101c..ccf62f1c46 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowTriggersTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowTriggersTask.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
 
 import org.apache.iotdb.commons.trigger.TriggerInformation;
-import org.apache.iotdb.commons.trigger.TriggerTable;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
@@ -37,6 +36,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -49,14 +49,16 @@ public class ShowTriggersTask implements IConfigTask {
   }
 
   public static void buildTsBlock(
-      TriggerTable triggerTable, SettableFuture<ConfigTaskResult> future) {
+      List<ByteBuffer> allTriggerInformation, SettableFuture<ConfigTaskResult> 
future) {
     List<TSDataType> outputDataTypes =
         ColumnHeaderConstant.showTriggersColumnHeaders.stream()
             .map(ColumnHeader::getColumnType)
             .collect(Collectors.toList());
     TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
-    if (triggerTable != null && !triggerTable.isEmpty()) {
-      for (TriggerInformation triggerInformation : 
triggerTable.getTable().values()) {
+    if (allTriggerInformation != null && !allTriggerInformation.isEmpty()) {
+      for (ByteBuffer triggerInformationByteBuffer : allTriggerInformation) {
+        TriggerInformation triggerInformation =
+            TriggerInformation.deserialize(triggerInformationByteBuffer);
         builder.getTimeColumnBuilder().writeLong(0L);
         builder
             .getColumnBuilder(0)
@@ -77,13 +79,15 @@ public class ShowTriggersTask implements IConfigTask {
         builder
             .getColumnBuilder(4)
             
.writeBinary(Binary.valueOf(triggerInformation.getPathPattern().toString()));
+        
builder.getColumnBuilder(5).writeBinary(Binary.valueOf(triggerInformation.getClassName()));
         builder
-            .getColumnBuilder(5)
+            .getColumnBuilder(6)
             .writeBinary(
                 Binary.valueOf(
                     !triggerInformation.isStateful()
                         ? "ALL"
-                        : 
triggerInformation.getDataNodeLocation().internalEndPoint.getIp()));
+                        : String.valueOf(
+                            
triggerInformation.getDataNodeLocation().getDataNodeId())));
         builder.declarePosition();
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index e99eab0125..a3bb7f546d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -40,7 +40,6 @@ import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
-import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
@@ -109,6 +108,7 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
+import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
@@ -925,11 +925,11 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   public TSStatus createTriggerInstance(TCreateTriggerInstanceReq req) throws 
TException {
     TriggerInformation triggerInformation = 
TriggerInformation.deserialize(req.triggerInformation);
     try {
-      // set state to INACTIVE when creating trigger instance
-      triggerInformation.setTriggerState(TTriggerState.INACTIVE);
       // save jar file at trigger_lib_dir
-      TriggerExecutableManager.getInstance()
-          .writeToLibDir(req.jarFile, triggerInformation.getJarName());
+      if (req.getJarFile() != null) {
+        TriggerExecutableManager.getInstance()
+            .writeToLibDir(req.jarFile, triggerInformation.getJarName());
+      }
       // register trigger information with TriggerRegistrationService
       // config nodes take responsibility for synchronization control
       TriggerManagementService.getInstance().register(triggerInformation);
@@ -959,6 +959,19 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
+  @Override
+  public TSStatus inactiveTriggerInstance(TInactiveTriggerInstanceReq req) 
throws TException {
+    try {
+      TriggerManagementService.getInstance().inactiveTrigger(req.triggerName);
+    } catch (Exception e) {
+      LOGGER.error("Error occurred during ");
+      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+          .setMessage(e.getMessage());
+    }
+
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
   @Override
   public TSStatus dropTriggerInstance(TDropTriggerInstanceReq req) throws 
TException {
     try {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
 
b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
index 25e7209537..4994ec1270 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.trigger.TriggerTable;
 import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
 import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
 import org.apache.iotdb.trigger.api.Trigger;
@@ -51,6 +52,8 @@ public class TriggerManagementService {
 
   private final Map<String, TriggerExecutor> executorMap;
 
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
   private static final int DATA_NODE_ID = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
 
   private TriggerManagementService() {
@@ -155,8 +158,8 @@ public class TriggerManagementService {
                     "Failed to registered trigger %s, "
                         + "because error occurred when trying to compute md5 
of jar file for trigger %s ",
                     triggerName, triggerName);
-            LOGGER.warn(errorMessage);
-            throw e;
+            LOGGER.warn(errorMessage, e);
+            throw new TriggerManagementException(errorMessage);
           }
         }
 
@@ -214,7 +217,6 @@ public class TriggerManagementService {
               "Failed to reflect trigger instance with className(%s), because 
%s", className, e));
     }
   }
-
   
/////////////////////////////////////////////////////////////////////////////////////////////////
   // singleton instance holder
   
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/server/src/test/resources/datanode1conf/iotdb-datanode.properties 
b/server/src/test/resources/datanode1conf/iotdb-datanode.properties
index ad13e4a1d9..f26d613df6 100644
--- a/server/src/test/resources/datanode1conf/iotdb-datanode.properties
+++ b/server/src/test/resources/datanode1conf/iotdb-datanode.properties
@@ -32,8 +32,9 @@ system_dir=target/datanode1/system
 data_dirs=target/datanode1/data
 wal_dirs=target/datanode1/wal
 index_root_dir=target/datanode1/data/index
-udf_root_dir=target/datanode1/ext
+udf_root_dir=target/datanode1/ext/udf
 tracing_dir=target/datanode1/data/tracing
 consensus_dir=target/datanode1/consensus
+trigger_root_dir=target/datanode1/ext/trigger
 sync_dir=target/datanode1/sync
 timestamp_precision=ms
diff --git a/server/src/test/resources/datanode2conf/iotdb-datanode.properties 
b/server/src/test/resources/datanode2conf/iotdb-datanode.properties
index 59d1ee3358..927da3237f 100644
--- a/server/src/test/resources/datanode2conf/iotdb-datanode.properties
+++ b/server/src/test/resources/datanode2conf/iotdb-datanode.properties
@@ -32,9 +32,9 @@ system_dir=target/datanode2/system
 data_dirs=target/datanode2/data
 wal_dirs=target/datanode2/wal
 index_root_dir=target/datanode2/data/index
-udf_root_dir=target/datanode2/ext
+udf_root_dir=target/datanode2/ext/udf
 tracing_dir=target/datanode2/data/tracing
 consensus_dir=target/datanode2/consensus
-timestamp_precision=ms
+trigger_root_dir=target/datanode2/ext/trigger
 sync_dir=target/datanode2/sync
 timestamp_precision=ms
diff --git a/server/src/test/resources/datanode3conf/iotdb-datanode.properties 
b/server/src/test/resources/datanode3conf/iotdb-datanode.properties
index fa2ab7f665..37578f269c 100644
--- a/server/src/test/resources/datanode3conf/iotdb-datanode.properties
+++ b/server/src/test/resources/datanode3conf/iotdb-datanode.properties
@@ -35,5 +35,6 @@ index_root_dir=target/datanode3/data/index
 udf_root_dir=target/datanode3/ext
 tracing_dir=target/datanode3/data/tracing
 consensus_dir=target/datanode3/consensus
+trigger_root_dir=target/datanode3/ext/trigger
 sync_dir=target/datanode3/sync
 timestamp_precision=ms
\ No newline at end of file
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 1bca6b5107..85315b196f 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -165,7 +165,8 @@ public enum TSStatusCode {
   REGION_LEADER_CHANGE_FAILED(918),
   REMOVE_DATANODE_FAILED(919),
   OVERLAP_WITH_EXISTING_DELETE_TIMESERIES_TASK(920),
-  NOT_AVAILABLE_REGION_GROUP(921);
+  NOT_AVAILABLE_REGION_GROUP(921),
+  CREATE_TRIGGER_ERROR(922);
 
   private int statusCode;
 
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift 
b/thrift-confignode/src/main/thrift/confignode.thrift
index 94a651c79e..e4ffd2a981 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -260,9 +260,7 @@ struct TDropFunctionReq {
 enum TTriggerState {
   // The intermediate state of Create trigger, the trigger need to create has 
not yet activated on any DataNodes.
   INACTIVE
-  // The intermediate state of Create trigger, the trigger need to create has 
activated on some DataNodes.
-  PARTIAL_ACTIVE
-  // Triggers on all DataNodes are available.
+  // The successful state of Create trigger, the trigger need to create has 
activated on some DataNodes.
   ACTIVE
   // The intermediate state of Drop trigger, the cluster is in the process of 
removing the trigger.
   DROPPING
@@ -289,7 +287,7 @@ struct TDropTriggerReq {
 // Get trigger table from config node
 struct TGetTriggerTableResp {
   1: required common.TSStatus status
-  2: required binary triggerTable
+  2: required list<binary> allTriggerInformation
 }
 
 // Show cluster
diff --git a/thrift/src/main/thrift/datanode.thrift 
b/thrift/src/main/thrift/datanode.thrift
index 12d3a41ddb..60cb2a2d43 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -178,6 +178,10 @@ struct TActiveTriggerInstanceReq {
   1: required string triggerName
 }
 
+struct TInactiveTriggerInstanceReq {
+  1: required string triggerName
+}
+
 struct TDropTriggerInstanceReq {
   1: required string triggerName
   2: required bool needToDeleteJarFile
@@ -430,11 +434,20 @@ service IDataNodeRPCService {
    **/
   common.TSStatus activeTriggerInstance(TActiveTriggerInstanceReq req)
 
+
+  /**
+   * Config node will inactive a trigger instance on data node.
+   *
+   * @param trigger name.
+   **/
+  common.TSStatus inactiveTriggerInstance(TInactiveTriggerInstanceReq req)
+
+
   /**
-    * Config node will drop a trigger on all online config nodes and data 
nodes.
-    *
-    * @param trigger name, whether need to delete jar
-    **/
+   * Config node will drop a trigger on all online config nodes and data nodes.
+   *
+   * @param trigger name, whether need to delete jar
+   **/
   common.TSStatus dropTriggerInstance(TDropTriggerInstanceReq req)
 
   /**
diff --git 
a/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/TriggerType.java 
b/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/TriggerType.java
index 84e449fa42..bf801d7f46 100644
--- 
a/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/TriggerType.java
+++ 
b/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/TriggerType.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.trigger.api.enums;
 
 public enum TriggerType {
   STATEFUL((byte) 0, "STATEFUL"),
-  STATELESS((byte) 0, "STATELESS");
+  STATELESS((byte) 1, "STATELESS");
 
   private final byte id;
   private final String type;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
index 336f7edc83..e3504533d7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
@@ -27,8 +27,8 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.Validate;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 
@@ -182,7 +182,7 @@ public class Path implements Serializable, Comparable<Path> 
{
     serializeWithoutType(byteBuffer);
   }
 
-  public void serialize(DataOutputStream stream) throws IOException {
+  public void serialize(OutputStream stream) throws IOException {
     ReadWriteIOUtils.write((byte) 3, stream); // 
org.apache.iotdb.db.metadata.path#PathType
     serializeWithoutType(stream);
   }
@@ -198,7 +198,7 @@ public class Path implements Serializable, Comparable<Path> 
{
     ReadWriteIOUtils.write(fullPath, byteBuffer);
   }
 
-  protected void serializeWithoutType(DataOutputStream stream) throws 
IOException {
+  protected void serializeWithoutType(OutputStream stream) throws IOException {
     ReadWriteIOUtils.write(measurement, stream);
     ReadWriteIOUtils.write(device, stream);
     ReadWriteIOUtils.write(fullPath, stream);

Reply via email to