This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 46e49622ba [IOTDB-4500]CreateTrigger and ShowTrigger process on
ConfigNode (#7474)
46e49622ba is described below
commit 46e49622bab4cbddbea2fde1f1ad97f6216e3361
Author: Weihao Li <[email protected]>
AuthorDate: Thu Sep 29 18:06:19 2022 +0800
[IOTDB-4500]CreateTrigger and ShowTrigger process on ConfigNode (#7474)
---
.../confignode/client/DataNodeRequestType.java | 6 +
.../async/datanode/AsyncDataNodeClientPool.java | 33 +++
.../async/handlers/TriggerManagementHandler.java | 72 +++++
.../iotdb/confignode/conf/ConfigNodeConfig.java | 21 ++
.../confignode/conf/ConfigNodeDescriptor.java | 2 +
.../consensus/request/ConfigPhysicalPlan.java | 16 ++
.../consensus/request/ConfigPhysicalPlanType.java | 6 +-
.../request/read/GetTriggerTablePlan.java | 33 +--
.../write/trigger/AddTriggerInTablePlan.java | 84 ++++++
.../write/trigger/DeleteTriggerInTablePlan.java | 62 +++++
.../trigger/UpdateTriggerStateInTablePlan.java | 75 ++++++
.../consensus/response/TriggerTableResp.java | 70 +++++
.../iotdb/confignode/manager/ConfigManager.java | 38 +++
.../apache/iotdb/confignode/manager/IManager.java | 19 ++
.../iotdb/confignode/manager/ProcedureManager.java | 22 ++
.../iotdb/confignode/manager/TriggerManager.java | 113 ++++++++
.../iotdb/confignode/manager/node/NodeManager.java | 24 ++
.../iotdb/confignode/persistence/TriggerInfo.java | 161 +++++++++++
.../persistence/executor/ConfigPlanExecutor.java | 15 ++
.../procedure/env/ConfigNodeProcedureEnv.java | 82 ++++++
.../procedure/impl/CreateTriggerProcedure.java | 294 +++++++++++++++++++++
.../procedure/state/CreateTriggerState.java | 19 +-
.../procedure/store/ProcedureFactory.java | 9 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 10 +-
.../request/ConfigPhysicalPlanSerDeTest.java | 68 +++++
.../procedure/impl/CreateTriggerProcedureTest.java | 111 ++++++++
.../confignode1conf/iotdb-confignode.properties | 1 +
.../confignode2conf/iotdb-confignode.properties | 1 +
.../confignode3conf/iotdb-confignode.properties | 1 +
.../org/apache/iotdb/trigger/SimpleTrigger.java | 22 +-
.../iotdb/trigger/{ => old}/AlertingExample.java | 2 +-
.../iotdb/trigger/{ => old}/TriggerExample.java | 2 +-
.../commons/executable/ExecutableManager.java | 4 +-
.../org/apache/iotdb/commons/path/AlignedPath.java | 4 +-
.../apache/iotdb/commons/path/MeasurementPath.java | 3 +-
.../org/apache/iotdb/commons/path/PartialPath.java | 23 +-
.../org/apache/iotdb/commons/path/PathType.java | 4 +-
.../iotdb/commons/trigger/TriggerInformation.java | 26 +-
.../apache/iotdb/commons/trigger/TriggerTable.java | 13 +-
...ption.java => TriggerJarTooLargeException.java} | 6 +-
.../apache/iotdb/db/client/ConfigNodeClient.java | 2 +-
.../db/mpp/common/header/ColumnHeaderConstant.java | 13 +-
.../config/executor/ClusterConfigTaskExecutor.java | 6 +-
.../config/metadata/ShowTriggersTask.java | 16 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 23 +-
.../trigger/service/TriggerManagementService.java | 8 +-
.../datanode1conf/iotdb-datanode.properties | 3 +-
.../datanode2conf/iotdb-datanode.properties | 4 +-
.../datanode3conf/iotdb-datanode.properties | 1 +
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 +-
.../src/main/thrift/confignode.thrift | 6 +-
thrift/src/main/thrift/datanode.thrift | 21 +-
.../iotdb/trigger/api/enums/TriggerType.java | 2 +-
.../org/apache/iotdb/tsfile/read/common/Path.java | 6 +-
54 files changed, 1576 insertions(+), 115 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 053e31a9d5..e3ac3c249e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -54,6 +54,12 @@ public enum DataNodeRequestType {
CREATE_FUNCTION,
DROP_FUNCTION,
+ /** Trigger */
+ CREATE_TRIGGER_INSTANCE,
+ DROP_TRIGGER_INSTANCE,
+ ACTIVE_TRIGGER_INSTANCE,
+ INACTIVE_TRIGGER_INSTANCE,
+
/** TEMPLATE */
UPDATE_TEMPLATE,
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
index b1ee6ee6f5..746fed9db4 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
@@ -45,18 +45,23 @@ import
org.apache.iotdb.confignode.client.async.handlers.MergeHandler;
import
org.apache.iotdb.confignode.client.async.handlers.RollbackSchemaBlackListHandler;
import
org.apache.iotdb.confignode.client.async.handlers.SetSystemStatusHandler;
import org.apache.iotdb.confignode.client.async.handlers.SetTTLHandler;
+import
org.apache.iotdb.confignode.client.async.handlers.TriggerManagementHandler;
import
org.apache.iotdb.confignode.client.async.handlers.UpdateConfigNodeGroupHandler;
import
org.apache.iotdb.confignode.client.async.handlers.UpdateRegionRouteMapHandler;
import org.apache.iotdb.confignode.client.async.task.AbstractDataNodeTask;
import
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
+import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
+import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
+import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
@@ -123,6 +128,18 @@ public class AsyncDataNodeClientPool {
dataNodeLocationMap,
dataNodeResponseStatus);
break;
+ case CREATE_TRIGGER_INSTANCE:
+ case DROP_TRIGGER_INSTANCE:
+ case ACTIVE_TRIGGER_INSTANCE:
+ case INACTIVE_TRIGGER_INSTANCE:
+ handler =
+ new TriggerManagementHandler(
+ countDownLatch,
+ requestType,
+ targetDataNode,
+ dataNodeLocationMap,
+ dataNodeResponseStatus);
+ break;
case FULL_MERGE:
case MERGE:
handler =
@@ -276,6 +293,22 @@ public class AsyncDataNodeClientPool {
case DROP_FUNCTION:
client.dropFunction((TDropFunctionRequest) req,
(FunctionManagementHandler) handler);
break;
+ case CREATE_TRIGGER_INSTANCE:
+ client.createTriggerInstance(
+ (TCreateTriggerInstanceReq) req, (TriggerManagementHandler)
handler);
+ break;
+ case DROP_TRIGGER_INSTANCE:
+ client.dropTriggerInstance(
+ (TDropTriggerInstanceReq) req, (TriggerManagementHandler)
handler);
+ break;
+ case ACTIVE_TRIGGER_INSTANCE:
+ client.activeTriggerInstance(
+ (TActiveTriggerInstanceReq) req, (TriggerManagementHandler)
handler);
+ break;
+ case INACTIVE_TRIGGER_INSTANCE:
+ client.inactiveTriggerInstance(
+ (TInactiveTriggerInstanceReq) req, (TriggerManagementHandler)
handler);
+ break;
case MERGE:
case FULL_MERGE:
client.merge((MergeHandler) handler);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/TriggerManagementHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/TriggerManagementHandler.java
new file mode 100644
index 0000000000..00d8047bdf
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/TriggerManagementHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.async.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class TriggerManagementHandler extends AbstractRetryHandler
+ implements AsyncMethodCallback<TSStatus> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TriggerManagementHandler.class);
+
+ private final List<TSStatus> dataNodeResponseStatus;
+
+ public TriggerManagementHandler(
+ CountDownLatch countDownLatch,
+ DataNodeRequestType requestType,
+ TDataNodeLocation targetDataNode,
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+ List<TSStatus> dataNodeResponseStatus) {
+ super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
+ this.dataNodeResponseStatus = dataNodeResponseStatus;
+ }
+
+ @Override
+ public void onComplete(TSStatus response) {
+ dataNodeResponseStatus.add(response);
+ if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
+ LOGGER.info("Successfully {} on DataNode: {}", dataNodeRequestType,
targetDataNode);
+ } else {
+ LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType,
targetDataNode);
+ }
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onError(Exception exception) {
+ dataNodeResponseStatus.add(
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(targetDataNode + exception.getMessage()));
+ LOGGER.info("Failed to {} on DataNode: {}", dataNodeRequestType,
targetDataNode);
+ countDownLatch.countDown();
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 56988a9176..29fdac0f6e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -100,6 +100,10 @@ public class ConfigNodeConfig {
private String udfLibDir =
IoTDBConstant.EXT_FOLDER_NAME + File.separator +
IoTDBConstant.UDF_FOLDER_NAME;
+ /** External lib directory for Trigger, stores user-uploaded JAR files */
+ private String triggerLibDir =
+ IoTDBConstant.EXT_FOLDER_NAME + File.separator +
IoTDBConstant.TRIGGER_FOLDER_NAME;
+
/** External temporary lib directory for storing downloaded JAR files */
private String temporaryLibDir =
IoTDBConstant.EXT_FOLDER_NAME + File.separator +
IoTDBConstant.UDF_TMP_FOLDER_NAME;
@@ -190,6 +194,7 @@ public class ConfigNodeConfig {
extLibDir = addHomeDir(extLibDir);
udfLibDir = addHomeDir(udfLibDir);
temporaryLibDir = addHomeDir(temporaryLibDir);
+ triggerLibDir = addHomeDir(triggerLibDir);
}
private String addHomeDir(String dir) {
@@ -385,6 +390,22 @@ public class ConfigNodeConfig {
this.udfLibDir = udfLibDir;
}
+ public String getExtLibDir() {
+ return extLibDir;
+ }
+
+ public void setExtLibDir(String extLibDir) {
+ this.extLibDir = extLibDir;
+ }
+
+ public String getTriggerLibDir() {
+ return triggerLibDir;
+ }
+
+ public void setTriggerLibDir(String triggerLibDir) {
+ this.triggerLibDir = triggerLibDir;
+ }
+
public String getTemporaryLibDir() {
return temporaryLibDir;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 2167564aa0..800bbeb0e5 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -202,6 +202,8 @@ public class ConfigNodeDescriptor {
conf.setTemporaryLibDir(
properties.getProperty("temporary_lib_dir",
conf.getTemporaryLibDir()));
+ conf.setTriggerLibDir(properties.getProperty("trigger_lib_dir",
conf.getTriggerLibDir()));
+
conf.setTimePartitionInterval(
Long.parseLong(
properties.getProperty(
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 9073554974..d4681a860d 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
import
org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
import
org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
import
org.apache.iotdb.confignode.consensus.request.read.template.GetAllTemplateSetInfoPlan;
@@ -60,6 +61,9 @@ import
org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan
import
org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
import
org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import
org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.exception.runtime.SerializationRunTimeException;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -216,6 +220,18 @@ public abstract class ConfigPhysicalPlan implements
IConsensusRequest {
case DropFunction:
req = new DropFunctionPlan();
break;
+ case AddTriggerInTable:
+ req = new AddTriggerInTablePlan();
+ break;
+ case DeleteTriggerInTable:
+ req = new DeleteTriggerInTablePlan();
+ break;
+ case UpdateTriggerStateInTable:
+ req = new UpdateTriggerStateInTablePlan();
+ break;
+ case GetTriggerTable:
+ req = new GetTriggerTablePlan();
+ break;
case CreateSchemaTemplate:
req = new CreateSchemaTemplatePlan();
break;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index b28ccf90b2..30a00fff43 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -81,5 +81,9 @@ public enum ConfigPhysicalPlanType {
PollRegionMaintainTask,
CreatePipeSink,
DropPipeSink,
- GetPipeSink
+ GetPipeSink,
+ AddTriggerInTable,
+ DeleteTriggerInTable,
+ GetTriggerTable,
+ UpdateTriggerStateInTable
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/path/PathType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
similarity index 57%
copy from node-commons/src/main/java/org/apache/iotdb/commons/path/PathType.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
index 832667fa91..da1caa3ca7 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PathType.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
@@ -16,36 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.commons.path;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+package org.apache.iotdb.confignode.consensus.request.read;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public enum PathType {
- Measurement((byte) 0),
- Aligned((byte) 1),
- Partial((byte) 2),
- Path((byte) 3);
-
- private final byte pathType;
+public class GetTriggerTablePlan extends ConfigPhysicalPlan {
- PathType(byte pathType) {
- this.pathType = pathType;
+ public GetTriggerTablePlan() {
+ super(ConfigPhysicalPlanType.GetTriggerTable);
}
- public void serialize(ByteBuffer buffer) {
- ReadWriteIOUtils.write(pathType, buffer);
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeInt(ConfigPhysicalPlanType.GetTriggerTable.ordinal());
}
- public void serialize(DataOutputStream stream) throws IOException {
- ReadWriteIOUtils.write(pathType, stream);
- }
-
- public void serialize(PublicBAOS stream) throws IOException {
- ReadWriteIOUtils.write(pathType, stream);
- }
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/AddTriggerInTablePlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/AddTriggerInTablePlan.java
new file mode 100644
index 0000000000..fd658e4aae
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/AddTriggerInTablePlan.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.trigger;
+
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AddTriggerInTablePlan extends ConfigPhysicalPlan {
+
+ private TriggerInformation triggerInformation;
+ private Binary jarFile;
+
+ public AddTriggerInTablePlan() {
+ super(ConfigPhysicalPlanType.AddTriggerInTable);
+ }
+
+ public AddTriggerInTablePlan(TriggerInformation triggerInformation, Binary
jarFile) {
+ super(ConfigPhysicalPlanType.AddTriggerInTable);
+ this.triggerInformation = triggerInformation;
+ this.jarFile = jarFile;
+ }
+
+ public TriggerInformation getTriggerInformation() {
+ return triggerInformation;
+ }
+
+ public void setTriggerInformation(TriggerInformation triggerInformation) {
+ this.triggerInformation = triggerInformation;
+ }
+
+ public Binary getJarFile() {
+ return jarFile;
+ }
+
+ public void setJarFile(Binary jarFile) {
+ this.jarFile = jarFile;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeInt(getType().ordinal());
+
+ triggerInformation.serialize(stream);
+ if (jarFile == null) {
+ ReadWriteIOUtils.write(true, stream);
+ } else {
+ ReadWriteIOUtils.write(false, stream);
+ ReadWriteIOUtils.write(jarFile, stream);
+ }
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ triggerInformation = TriggerInformation.deserialize(buffer);
+ if (ReadWriteIOUtils.readBool(buffer)) {
+ return;
+ }
+ jarFile = ReadWriteIOUtils.readBinary(buffer);
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/DeleteTriggerInTablePlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/DeleteTriggerInTablePlan.java
new file mode 100644
index 0000000000..c91218f86a
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/DeleteTriggerInTablePlan.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.trigger;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DeleteTriggerInTablePlan extends ConfigPhysicalPlan {
+
+ private String triggerName;
+
+ public DeleteTriggerInTablePlan() {
+ super(ConfigPhysicalPlanType.DeleteTriggerInTable);
+ }
+
+ public DeleteTriggerInTablePlan(String triggerName) {
+ super(ConfigPhysicalPlanType.DeleteTriggerInTable);
+ this.triggerName = triggerName;
+ }
+
+ public String getTriggerName() {
+ return triggerName;
+ }
+
+ public void setTriggerName(String triggerName) {
+ this.triggerName = triggerName;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeInt(getType().ordinal());
+
+ ReadWriteIOUtils.write(triggerName, stream);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ triggerName = ReadWriteIOUtils.readString(buffer);
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerStateInTablePlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerStateInTablePlan.java
new file mode 100644
index 0000000000..448b69291f
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerStateInTablePlan.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.trigger;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class UpdateTriggerStateInTablePlan extends ConfigPhysicalPlan {
+
+ private String triggerName;
+ private TTriggerState triggerState;
+
+ public UpdateTriggerStateInTablePlan() {
+ super(ConfigPhysicalPlanType.UpdateTriggerStateInTable);
+ }
+
+ public UpdateTriggerStateInTablePlan(String triggerName, TTriggerState
triggerState) {
+ super(ConfigPhysicalPlanType.UpdateTriggerStateInTable);
+ this.triggerName = triggerName;
+ this.triggerState = triggerState;
+ }
+
+ public String getTriggerName() {
+ return triggerName;
+ }
+
+ public void setTriggerName(String triggerName) {
+ this.triggerName = triggerName;
+ }
+
+ public TTriggerState getTriggerState() {
+ return triggerState;
+ }
+
+ public void setTriggerState(TTriggerState triggerState) {
+ this.triggerState = triggerState;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeInt(getType().ordinal());
+
+ ReadWriteIOUtils.write(triggerName, stream);
+ ReadWriteIOUtils.write(triggerState.getValue(), stream);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ triggerName = ReadWriteIOUtils.readString(buffer);
+ triggerState = TTriggerState.findByValue(ReadWriteIOUtils.readInt(buffer));
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TriggerTableResp.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TriggerTableResp.java
new file mode 100644
index 0000000000..77305ac07f
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TriggerTableResp.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
+import org.apache.iotdb.consensus.common.DataSet;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TriggerTableResp implements DataSet {
+
+ private TSStatus status;
+
+ private List<TriggerInformation> allTriggerInformation;
+
+ public TriggerTableResp() {}
+
+ public TriggerTableResp(TSStatus status, List<TriggerInformation>
allTriggerInformation) {
+ this.status = status;
+ this.allTriggerInformation = allTriggerInformation;
+ }
+
+ public TSStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(TSStatus status) {
+ this.status = status;
+ }
+
+ public List<TriggerInformation> getAllTriggerInformation() {
+ return allTriggerInformation;
+ }
+
+ public void setAllTriggerInformation(List<TriggerInformation>
allTriggerInformation) {
+ this.allTriggerInformation = allTriggerInformation;
+ }
+
+ public TGetTriggerTableResp convertToThriftResponse() throws IOException {
+ List<ByteBuffer> triggerInformationByteBuffers = new ArrayList<>();
+
+ for (TriggerInformation triggerInformation : allTriggerInformation) {
+ triggerInformationByteBuffers.add(triggerInformation.serialize());
+ }
+
+ return new TGetTriggerTableResp(status, triggerInformationByteBuffers);
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 63be337df4..445d69fd04 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -76,6 +76,7 @@ import
org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.persistence.AuthorInfo;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
+import org.apache.iotdb.confignode.persistence.TriggerInfo;
import org.apache.iotdb.confignode.persistence.UDFInfo;
import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
@@ -83,13 +84,16 @@ import
org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -151,6 +155,8 @@ public class ConfigManager implements IManager {
/** UDF */
private final UDFManager udfManager;
+ /** Manage Trigger */
+ private final TriggerManager triggerManager;
/** Sync */
private final SyncManager syncManager;
@@ -162,6 +168,7 @@ public class ConfigManager implements IManager {
AuthorInfo authorInfo = new AuthorInfo();
ProcedureInfo procedureInfo = new ProcedureInfo();
UDFInfo udfInfo = new UDFInfo();
+ TriggerInfo triggerInfo = new TriggerInfo();
ClusterSyncInfo syncInfo = new ClusterSyncInfo();
// Build state machine and executor
@@ -173,6 +180,7 @@ public class ConfigManager implements IManager {
authorInfo,
procedureInfo,
udfInfo,
+ triggerInfo,
syncInfo);
PartitionRegionStateMachine stateMachine = new
PartitionRegionStateMachine(this, executor);
@@ -183,6 +191,7 @@ public class ConfigManager implements IManager {
this.permissionManager = new PermissionManager(this, authorInfo);
this.procedureManager = new ProcedureManager(this, procedureInfo);
this.udfManager = new UDFManager(this, udfInfo);
+ this.triggerManager = new TriggerManager(this, triggerInfo);
this.loadManager = new LoadManager(this);
this.syncManager = new SyncManager(this, syncInfo);
@@ -595,6 +604,11 @@ public class ConfigManager implements IManager {
return loadManager;
}
+ @Override
+ public TriggerManager getTriggerManager() {
+ return triggerManager;
+ }
+
@Override
public TSStatus operatePermission(AuthorPlan authorPlan) {
TSStatus status = confirmLeader();
@@ -747,6 +761,30 @@ public class ConfigManager implements IManager {
: status;
}
+ @Override
+ public TSStatus createTrigger(TCreateTriggerReq req) {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? triggerManager.createTrigger(req)
+ : status;
+ }
+
+ @Override
+ public TSStatus dropTrigger(TDropTriggerReq req) {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? triggerManager.dropTrigger(req)
+ : status;
+ }
+
+ @Override
+ public TGetTriggerTableResp getTriggerTable() {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? triggerManager.getTriggerTable()
+ : new TGetTriggerTableResp().setStatus(status);
+ }
+
@Override
public TSStatus merge() {
TSStatus status = confirmLeader();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 38908a89a7..64519e0770 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -45,13 +45,16 @@ import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -121,6 +124,13 @@ public interface IManager {
*/
UDFManager getUDFManager();
+ /**
+ * Get TriggerManager
+ *
+ * @return TriggerManager instance
+ */
+ TriggerManager getTriggerManager();
+
/**
* Get ProcedureManager
*
@@ -278,6 +288,15 @@ public interface IManager {
TSStatus dropFunction(String udfName);
+ /** Create trigger */
+ TSStatus createTrigger(TCreateTriggerReq req);
+
+ /** Drop trigger */
+ TSStatus dropTrigger(TDropTriggerReq req);
+
+ /** Show trigger & DataNode start */
+ TGetTriggerTableResp getTriggerTable();
+
/** Merge on all DataNodes */
TSStatus merge();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index ecc84cf8b4..6b9f3e819a 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -35,6 +36,7 @@ import
org.apache.iotdb.confignode.procedure.ProcedureExecutor;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
+import org.apache.iotdb.confignode.procedure.impl.CreateTriggerProcedure;
import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.procedure.impl.DeleteTimeSeriesProcedure;
import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
@@ -52,6 +54,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -222,6 +225,25 @@ public class ProcedureManager {
}
}
+ /**
+ * Generate CreateTriggerProcedure and wait for it finished
+ *
+ * @return SUCCESS_STATUS if trigger created successfully,
CREATE_TRIGGER_ERROR otherwise
+ */
+ public TSStatus createTrigger(TriggerInformation triggerInformation, Binary
jarFile) {
+ long procedureId =
+ executor.submitProcedure(new
CreateTriggerProcedure(triggerInformation, jarFile));
+ List<TSStatus> statusList = new ArrayList<>();
+ boolean isSucceed =
+ waitingProcedureFinished(Collections.singletonList(procedureId),
statusList);
+ if (isSucceed) {
+ return RpcUtils.SUCCESS_STATUS;
+ } else {
+ return new TSStatus(TSStatusCode.CREATE_TRIGGER_ERROR.getStatusCode())
+ .setMessage(statusList.get(0).getMessage());
+ }
+ }
+
/**
* Waiting until the specific procedures finished
*
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
new file mode 100644
index 0000000000..cc329952d6
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
+import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
+import org.apache.iotdb.confignode.persistence.TriggerInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.trigger.api.enums.TriggerEvent;
+import org.apache.iotdb.trigger.api.enums.TriggerType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class TriggerManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TriggerManager.class);
+
+ private final ConfigManager configManager;
+ private final TriggerInfo triggerInfo;
+
+ public TriggerManager(ConfigManager configManager, TriggerInfo triggerInfo) {
+ this.configManager = configManager;
+ this.triggerInfo = triggerInfo;
+ }
+
+ public TriggerInfo getTriggerInfo() {
+ return triggerInfo;
+ }
+
+ /**
+ * Create a trigger in cluster.
+ *
+ * <p>If TriggerType is STATELESS, we should create TriggerInstance on all
DataNodes, the
+ * DataNodeLocation in TriggerInformation will be null.
+ *
+ * <p>If TriggerType is STATEFUL, we should create TriggerInstance on the
DataNode with the lowest
+ * load, and DataNodeLocation of this DataNode will be saved.
+ *
+ * <p>All DataNodes will add TriggerInformation of this trigger in local
TriggerTable.
+ *
+ * @param req the createTrigger request
+ * @return status of create this trigger
+ */
+ public TSStatus createTrigger(TCreateTriggerReq req) {
+ boolean isStateful = TriggerType.construct(req.getTriggerType()) ==
TriggerType.STATEFUL;
+ TDataNodeLocation dataNodeLocation =
+ isStateful ? configManager.getNodeManager().getLowestLoadDataNode() :
null;
+ TriggerInformation triggerInformation =
+ new TriggerInformation(
+ (PartialPath) PathDeserializeUtil.deserialize(req.pathPattern),
+ req.getTriggerName(),
+ req.getClassName(),
+ req.getJarPath(),
+ req.getAttributes(),
+ TriggerEvent.construct(req.triggerEvent),
+ TTriggerState.INACTIVE,
+ isStateful,
+ dataNodeLocation,
+ req.getJarMD5());
+ return configManager
+ .getProcedureManager()
+ .createTrigger(triggerInformation, new Binary(req.getJarFile()));
+ }
+
+ public TSStatus dropTrigger(TDropTriggerReq req) {
+ // TODO
+ return null;
+ }
+
+ public TGetTriggerTableResp getTriggerTable() {
+ try {
+ return ((TriggerTableResp)
+ configManager.getConsensusManager().read(new
GetTriggerTablePlan()).getDataset())
+ .convertToThriftResponse();
+ } catch (IOException e) {
+ LOGGER.error("Fail to get TriggerTable", e);
+ return new TGetTriggerTableResp(
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(e.getMessage()),
+ Collections.emptyList());
+ }
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index f974a1de24..f997298e3d 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -85,6 +85,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -703,6 +704,29 @@ public class NodeManager {
return result;
}
+ /**
+ * Get the DataNodeLocation of the DataNode which has the lowest load
+ *
+ * @return TDataNodeLocation
+ */
+ public TDataNodeLocation getLowestLoadDataNode() {
+ AtomicInteger result = new AtomicInteger();
+ AtomicLong lowestLoadScore = new AtomicLong(Long.MAX_VALUE);
+
+ nodeCacheMap.forEach(
+ (dataNodeId, heartbeatCache) -> {
+ long score = heartbeatCache.getLoadScore();
+ if (score < lowestLoadScore.get()) {
+ result.set(dataNodeId);
+ lowestLoadScore.set(score);
+ }
+ });
+
+ LOGGER.info(
+ "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result,
lowestLoadScore);
+ return
configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
+ }
+
public boolean isNodeRemoving(int dataNodeId) {
DataNodeHeartbeatCache cache =
(DataNodeHeartbeatCache)
configManager.getNodeManager().getNodeCacheMap().get(dataNodeId);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
new file mode 100644
index 0000000000..e9f4fac2fd
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.commons.trigger.TriggerTable;
+import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
+import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TriggerInfo implements SnapshotProcessor {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TriggerInfo.class);
+
+ private static final ConfigNodeConfig CONFIG_NODE_CONF =
+ ConfigNodeDescriptor.getInstance().getConf();
+
+ private final TriggerTable triggerTable;
+ private final Map<String, String> existedJarToMD5;
+ // private final Map<String, AtomicInteger> jarReferenceTable;
+
+ private final TriggerExecutableManager triggerExecutableManager;
+
+ private final ReentrantLock triggerTableLock = new ReentrantLock();
+
+ private final String snapshotFileName = "trigger_info.bin";
+
+ public TriggerInfo() throws IOException {
+ triggerTable = new TriggerTable();
+ existedJarToMD5 = new HashMap<>();
+ // jarReferenceTable = new ConcurrentHashMap<>();
+ triggerExecutableManager =
+ TriggerExecutableManager.setupAndGetInstance(
+ CONFIG_NODE_CONF.getTemporaryLibDir(),
CONFIG_NODE_CONF.getTriggerLibDir());
+ }
+
+ public void acquireTriggerTableLock() {
+ LOGGER.info("acquire TriggerTableLock");
+ triggerTableLock.lock();
+ }
+
+ public void releaseTriggerTableLock() {
+ LOGGER.info("release TriggerTableLock");
+ triggerTableLock.unlock();
+ }
+
+ @Override
+ public boolean processTakeSnapshot(File snapshotDir) throws TException,
IOException {
+ // TODO implement when 'Drop Trigger' done
+ return true;
+ }
+
+ @Override
+ public void processLoadSnapshot(File snapshotDir) throws TException,
IOException {
+ // TODO implement when 'Drop Trigger' done
+ }
+
+ /**
+ * Validate whether the trigger can be created
+ *
+ * @param triggerName
+ * @param jarName
+ * @param jarMD5
+ */
+ public void validate(String triggerName, String jarName, String jarMD5) {
+ if (triggerTable.containsTrigger(triggerName)) {
+ throw new TriggerManagementException(
+ String.format(
+ "Failed to create trigger [%s], the same name trigger has been
created",
+ triggerName));
+ }
+
+ if (existedJarToMD5.containsKey(jarName) &&
!existedJarToMD5.get(jarName).equals(jarMD5)) {
+ throw new TriggerManagementException(
+ String.format(
+ "Failed to create trigger [%s], the same name Jar [%s] but
different MD5 [%s] has existed",
+ triggerName, jarName, jarMD5));
+ }
+ }
+
+ public boolean needToSaveJar(String jarName) {
+ return !existedJarToMD5.containsKey(jarName);
+ }
+
+ public TSStatus addTriggerInTable(AddTriggerInTablePlan physicalPlan) {
+ try {
+ TriggerInformation triggerInformation =
physicalPlan.getTriggerInformation();
+ triggerTable.addTriggerInformation(triggerInformation.getTriggerName(),
triggerInformation);
+ existedJarToMD5.put(triggerInformation.getJarName(),
triggerInformation.getJarFileMD5());
+ if (physicalPlan.getJarFile() != null) {
+ triggerExecutableManager.writeToLibDir(
+ ByteBuffer.wrap(physicalPlan.getJarFile().getValues()),
+ triggerInformation.getJarName());
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (Exception e) {
+ final String errorMessage =
+ String.format(
+ "Failed to add trigger [%s] in TriggerTable on Config Nodes,
because of %s",
+ physicalPlan.getTriggerInformation().getTriggerName(), e);
+ LOGGER.warn(errorMessage, e);
+ return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(errorMessage);
+ }
+ }
+
+ public TSStatus deleteTriggerInTable(DeleteTriggerInTablePlan physicalPlan) {
+ String triggerName = physicalPlan.getTriggerName();
+ if (triggerTable.containsTrigger(triggerName)) {
+
existedJarToMD5.remove(triggerTable.getTriggerInformation(triggerName).getJarName());
+ triggerTable.deleteTriggerInformation(triggerName);
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ public TSStatus updateTriggerStateInTable(UpdateTriggerStateInTablePlan
physicalPlan) {
+ triggerTable.setTriggerState(physicalPlan.getTriggerName(),
physicalPlan.getTriggerState());
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ public TriggerTableResp getTriggerTable() {
+ return new TriggerTableResp(
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+ triggerTable.getAllTriggerInformation());
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 6f0023c7bf..4fc66d6a81 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -61,11 +61,15 @@ import
org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan
import
org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
import
org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import
org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
import
org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
import org.apache.iotdb.confignode.persistence.AuthorInfo;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
+import org.apache.iotdb.confignode.persistence.TriggerInfo;
import org.apache.iotdb.confignode.persistence.UDFInfo;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
@@ -105,6 +109,7 @@ public class ConfigPlanExecutor {
private final UDFInfo udfInfo;
+ private final TriggerInfo triggerInfo;
private final ClusterSyncInfo syncInfo;
public ConfigPlanExecutor(
@@ -114,6 +119,7 @@ public class ConfigPlanExecutor {
AuthorInfo authorInfo,
ProcedureInfo procedureInfo,
UDFInfo udfInfo,
+ TriggerInfo triggerInfo,
ClusterSyncInfo syncInfo) {
this.nodeInfo = nodeInfo;
this.clusterSchemaInfo = clusterSchemaInfo;
@@ -121,6 +127,7 @@ public class ConfigPlanExecutor {
this.authorInfo = authorInfo;
this.procedureInfo = procedureInfo;
this.udfInfo = udfInfo;
+ this.triggerInfo = triggerInfo;
this.syncInfo = syncInfo;
}
@@ -163,6 +170,8 @@ public class ConfigPlanExecutor {
return clusterSchemaInfo.getAllTemplateSetInfo();
case GetPipeSink:
return syncInfo.getPipeSink((GetPipeSinkPlan) req);
+ case GetTriggerTable:
+ return triggerInfo.getTriggerTable();
default:
throw new UnknownPhysicalPlanTypeException(req.getType());
}
@@ -234,6 +243,12 @@ public class ConfigPlanExecutor {
return udfInfo.createFunction((CreateFunctionPlan) physicalPlan);
case DropFunction:
return udfInfo.dropFunction((DropFunctionPlan) physicalPlan);
+ case AddTriggerInTable:
+ return triggerInfo.addTriggerInTable((AddTriggerInTablePlan)
physicalPlan);
+ case DeleteTriggerInTable:
+ return triggerInfo.deleteTriggerInTable((DeleteTriggerInTablePlan)
physicalPlan);
+ case UpdateTriggerStateInTable:
+ return
triggerInfo.updateTriggerStateInTable((UpdateTriggerStateInTablePlan)
physicalPlan);
case CreateSchemaTemplate:
return
clusterSchemaInfo.createSchemaTemplate((CreateSchemaTemplatePlan) physicalPlan);
case UpdateRegionLocation:
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 82dffe0031..f3778361a2 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import
org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
@@ -49,16 +50,23 @@ import
org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
+import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -362,6 +370,80 @@ public class ConfigNodeProcedureEnv {
return getPartitionManager().getAllReplicaSets(storageGroup);
}
+ public List<TSStatus> createTriggerOnDataNodes(
+ TriggerInformation triggerInformation, Binary jarFile) throws
IOException {
+ NodeManager nodeManager = configManager.getNodeManager();
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ nodeManager.getRegisteredDataNodeLocations();
+ final List<TSStatus> dataNodeResponseStatus =
+ Collections.synchronizedList(new
ArrayList<>(dataNodeLocationMap.size()));
+ final TCreateTriggerInstanceReq request =
+ new TCreateTriggerInstanceReq(
+ triggerInformation.serialize(),
ByteBuffer.wrap(jarFile.getValues()));
+ // TODO: The request sent to DataNodes which stateful triggerInstance
needn't to be created
+ // don't set
+ // JarFile
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(
+ request,
+ dataNodeLocationMap,
+ DataNodeRequestType.CREATE_TRIGGER_INSTANCE,
+ dataNodeResponseStatus);
+ return dataNodeResponseStatus;
+ }
+
+ public List<TSStatus> dropTriggerOnDataNodes(TriggerInformation
triggerInformation)
+ throws IOException {
+ NodeManager nodeManager = configManager.getNodeManager();
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ nodeManager.getRegisteredDataNodeLocations();
+ final List<TSStatus> dataNodeResponseStatus =
+ Collections.synchronizedList(new
ArrayList<>(dataNodeLocationMap.size()));
+ final TDropTriggerInstanceReq request =
+ new TDropTriggerInstanceReq(triggerInformation.getTriggerName(),
false);
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(
+ request,
+ dataNodeLocationMap,
+ DataNodeRequestType.DROP_TRIGGER_INSTANCE,
+ dataNodeResponseStatus);
+ return dataNodeResponseStatus;
+ }
+
+ public List<TSStatus> activeTriggerOnDataNodes(String triggerName) throws
IOException {
+ NodeManager nodeManager = configManager.getNodeManager();
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ nodeManager.getRegisteredDataNodeLocations();
+ final List<TSStatus> dataNodeResponseStatus =
+ Collections.synchronizedList(new
ArrayList<>(dataNodeLocationMap.size()));
+ final TActiveTriggerInstanceReq request = new
TActiveTriggerInstanceReq(triggerName);
+
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(
+ request,
+ dataNodeLocationMap,
+ DataNodeRequestType.ACTIVE_TRIGGER_INSTANCE,
+ dataNodeResponseStatus);
+ return dataNodeResponseStatus;
+ }
+
+ public List<TSStatus> inactiveTriggerOnDataNodes(String triggerName) throws
IOException {
+ NodeManager nodeManager = configManager.getNodeManager();
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ nodeManager.getRegisteredDataNodeLocations();
+ final List<TSStatus> dataNodeResponseStatus =
+ Collections.synchronizedList(new
ArrayList<>(dataNodeLocationMap.size()));
+ final TInactiveTriggerInstanceReq request = new
TInactiveTriggerInstanceReq(triggerName);
+
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(
+ request,
+ dataNodeLocationMap,
+ DataNodeRequestType.INACTIVE_TRIGGER_INSTANCE,
+ dataNodeResponseStatus);
+ return dataNodeResponseStatus;
+ }
+
public LockQueue getNodeLock() {
return nodeLock;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
new file mode 100644
index 0000000000..85bdfaa021
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.persistence.TriggerInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.CreateTriggerState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** remove config node procedure */
+public class CreateTriggerProcedure extends
AbstractNodeProcedure<CreateTriggerState> {
+ private static final Logger LOG =
LoggerFactory.getLogger(CreateTriggerProcedure.class);
+ private static final int retryThreshold = 5;
+
+ private TriggerInformation triggerInformation;
+ private Binary jarFile;
+
+ public CreateTriggerProcedure() {
+ super();
+ }
+
+ public CreateTriggerProcedure(TriggerInformation triggerInformation, Binary
jarFile) {
+ super();
+ this.triggerInformation = triggerInformation;
+ this.jarFile = jarFile;
+ }
+
+ @Override
+ protected Flow executeFromState(ConfigNodeProcedureEnv env,
CreateTriggerState state) {
+ if (triggerInformation == null) {
+ return Flow.NO_MORE_STATE;
+ }
+ try {
+ switch (state) {
+ case INIT:
+ LOG.info("Start to create trigger [{}]",
triggerInformation.getTriggerName());
+
+ TriggerInfo triggerInfo =
env.getConfigManager().getTriggerManager().getTriggerInfo();
+ triggerInfo.acquireTriggerTableLock();
+ triggerInfo.validate(
+ triggerInformation.getTriggerName(),
+ triggerInformation.getJarName(),
+ triggerInformation.getJarFileMD5());
+ setNextState(CreateTriggerState.VALIDATED);
+ break;
+
+ case VALIDATED:
+ ConfigManager configManager = env.getConfigManager();
+ boolean needToSaveJar =
+ configManager
+ .getTriggerManager()
+ .getTriggerInfo()
+ .needToSaveJar(triggerInformation.getJarName());
+
+ LOG.info(
+ "Start to add trigger [{}] in TriggerTable on Config Nodes,
needToSaveJar[{}]",
+ triggerInformation.getTriggerName(),
+ needToSaveJar);
+
+ ConsensusWriteResponse response =
+ configManager
+ .getConsensusManager()
+ .write(
+ new AddTriggerInTablePlan(
+ triggerInformation, needToSaveJar ? jarFile : null));
+ if (!response.isSuccessful()) {
+ throw new TriggerManagementException(response.getErrorMessage());
+ }
+
+ setNextState(CreateTriggerState.CONFIG_NODE_INACTIVE);
+ break;
+
+ case CONFIG_NODE_INACTIVE:
+ LOG.info(
+ "Start to create triggerInstance [{}] on Data Nodes",
+ triggerInformation.getTriggerName());
+
+ if (RpcUtils.squashResponseStatusList(
+ env.createTriggerOnDataNodes(triggerInformation,
jarFile))
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ setNextState(CreateTriggerState.DATA_NODE_INACTIVE);
+ } else {
+ throw new TriggerManagementException(
+ String.format(
+ "Fail to create triggerInstance [%s] on Data Nodes",
+ triggerInformation.getTriggerName()));
+ }
+ break;
+
+ case DATA_NODE_INACTIVE:
+ LOG.info(
+ "Start to active trigger [{}] on Data Nodes",
triggerInformation.getTriggerName());
+
+ if (RpcUtils.squashResponseStatusList(
+
env.activeTriggerOnDataNodes(triggerInformation.getTriggerName()))
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ setNextState(CreateTriggerState.DATA_NODE_ACTIVE);
+ } else {
+ throw new TriggerManagementException(
+ String.format(
+ "Fail to active triggerInstance [%s] on Data Nodes",
+ triggerInformation.getTriggerName()));
+ }
+ break;
+
+ case DATA_NODE_ACTIVE:
+ LOG.info(
+ "Start to active trigger [{}] on Config Nodes",
triggerInformation.getTriggerName());
+ env.getConfigManager()
+ .getConsensusManager()
+ .write(
+ new UpdateTriggerStateInTablePlan(
+ triggerInformation.getTriggerName(),
TTriggerState.ACTIVE));
+ setNextState(CreateTriggerState.CONFIG_NODE_ACTIVE);
+ break;
+
+ case CONFIG_NODE_ACTIVE:
+
env.getConfigManager().getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
+ return Flow.NO_MORE_STATE;
+ }
+ } catch (Exception e) {
+ if (isRollbackSupported(state)) {
+ LOG.error("Fail in CreateTriggerProcedure", e);
+ setFailure(new ProcedureException(e.getMessage()));
+ } else {
+ LOG.error(
+ "Retrievable error trying to create trigger [{}], state [{}]",
+ triggerInformation.getTriggerName(),
+ state,
+ e);
+ if (getCycles() > retryThreshold) {
+ setFailure(
+ new ProcedureException(
+ String.format(
+ "Fail to create trigger [%s] at STATE [%s]",
+ triggerInformation.getTriggerName(), state)));
+ }
+ }
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv env, CreateTriggerState
state)
+ throws IOException, InterruptedException, ProcedureException {
+ switch (state) {
+ case INIT:
+ LOG.info("Start [INIT] rollback of trigger [{}]",
triggerInformation.getTriggerName());
+
+
env.getConfigManager().getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
+ break;
+
+ case VALIDATED:
+ LOG.info("Start [VALIDATED] rollback of trigger [{}]",
triggerInformation.getTriggerName());
+
+ env.getConfigManager()
+ .getConsensusManager()
+ .write(new
DeleteTriggerInTablePlan(triggerInformation.getTriggerName()));
+ break;
+
+ case CONFIG_NODE_INACTIVE:
+ LOG.info(
+ "Start to [CONFIG_NODE_INACTIVE] rollback of trigger [{}]",
+ triggerInformation.getTriggerName());
+
+ if
(RpcUtils.squashResponseStatusList(env.dropTriggerOnDataNodes(triggerInformation))
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ } else {
+ throw new TriggerManagementException(
+ String.format(
+ "Fail to [CONFIG_NODE_INACTIVE] rollback of trigger [{}]",
+ triggerInformation.getTriggerName()));
+ }
+ break;
+
+ case DATA_NODE_INACTIVE:
+ LOG.info(
+ "Start to [DATA_NODE_INACTIVE] rollback of trigger [{}]",
+ triggerInformation.getTriggerName());
+
+ if (RpcUtils.squashResponseStatusList(
+
env.inactiveTriggerOnDataNodes(triggerInformation.getTriggerName()))
+ .getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new TriggerManagementException(
+ String.format(
+ "Fail to [DATA_NODE_INACTIVE] rollback of trigger [%s]",
+ triggerInformation.getTriggerName()));
+ }
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ @Override
+ protected boolean isRollbackSupported(CreateTriggerState state) {
+ return true;
+ }
+
+ @Override
+ protected CreateTriggerState getState(int stateId) {
+ return CreateTriggerState.values()[stateId];
+ }
+
+ @Override
+ protected int getStateId(CreateTriggerState createTriggerState) {
+ return createTriggerState.ordinal();
+ }
+
+ @Override
+ protected CreateTriggerState getInitialState() {
+ return CreateTriggerState.INIT;
+ }
+
+ public Binary getJarFile() {
+ return jarFile;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+
stream.writeInt(ProcedureFactory.ProcedureType.CREATE_TRIGGER_PROCEDURE.ordinal());
+ super.serialize(stream);
+ triggerInformation.serialize(stream);
+ if (jarFile == null) {
+ ReadWriteIOUtils.write(true, stream);
+ } else {
+ ReadWriteIOUtils.write(false, stream);
+ ReadWriteIOUtils.write(jarFile, stream);
+ }
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ triggerInformation = TriggerInformation.deserialize(byteBuffer);
+ if (ReadWriteIOUtils.readBool(byteBuffer)) {
+ return;
+ }
+ jarFile = ReadWriteIOUtils.readBinary(byteBuffer);
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that instanceof CreateTriggerProcedure) {
+ CreateTriggerProcedure thatProc = (CreateTriggerProcedure) that;
+ return thatProc.getProcId() == this.getProcId()
+ && thatProc.getState() == this.getState()
+ && thatProc.triggerInformation.equals(this.triggerInformation);
+ }
+ return false;
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateTriggerState.java
similarity index 68%
copy from
node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateTriggerState.java
index 82705274bf..1ab427b403 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateTriggerState.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -17,14 +17,13 @@
* under the License.
*/
-package org.apache.iotdb.commons.trigger.exception;
+package org.apache.iotdb.confignode.procedure.state;
-public class TriggerJarToLargeException extends RuntimeException {
- public TriggerJarToLargeException(String message) {
- super(message);
- }
-
- public TriggerJarToLargeException(String message, Throwable cause) {
- super(message, cause);
- }
+public enum CreateTriggerState {
+ INIT,
+ VALIDATED,
+ CONFIG_NODE_INACTIVE,
+ DATA_NODE_INACTIVE,
+ DATA_NODE_ACTIVE,
+ CONFIG_NODE_ACTIVE
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index fff4260307..7101f1e7f5 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure.store;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
+import org.apache.iotdb.confignode.procedure.impl.CreateTriggerProcedure;
import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.procedure.impl.DeleteTimeSeriesProcedure;
import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
@@ -69,6 +70,9 @@ public class ProcedureFactory implements IProcedureFactory {
case DELETE_TIMESERIES_PROCEDURE:
procedure = new DeleteTimeSeriesProcedure();
break;
+ case CREATE_TRIGGER_PROCEDURE:
+ procedure = new CreateTriggerProcedure();
+ break;
default:
LOGGER.error("unknown Procedure type: " + typeNum);
throw new IOException("unknown Procedure type: " + typeNum);
@@ -92,6 +96,8 @@ public class ProcedureFactory implements IProcedureFactory {
return ProcedureType.CREATE_REGION_GROUPS;
} else if (procedure instanceof DeleteTimeSeriesProcedure) {
return ProcedureType.DELETE_TIMESERIES_PROCEDURE;
+ } else if (procedure instanceof CreateTriggerProcedure) {
+ return ProcedureType.CREATE_TRIGGER_PROCEDURE;
}
return null;
}
@@ -103,7 +109,8 @@ public class ProcedureFactory implements IProcedureFactory {
REMOVE_DATA_NODE_PROCEDURE,
REGION_MIGRATE_PROCEDURE,
CREATE_REGION_GROUPS,
- DELETE_TIMESERIES_PROCEDURE
+ DELETE_TIMESERIES_PROCEDURE,
+ CREATE_TRIGGER_PROCEDURE
}
private static class ProcedureFactoryHolder {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index d252e90b49..d71919e555 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -481,20 +481,16 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
@Override
public TSStatus createTrigger(TCreateTriggerReq req) throws TException {
- // todo : implementation
- return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ return configManager.createTrigger(req);
}
@Override
public TSStatus dropTrigger(TDropTriggerReq req) throws TException {
- // todo : implementation
- return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ return configManager.dropTrigger(req);
}
public TGetTriggerTableResp getTriggerTable() throws TException {
- // todo: implementation
- return new TGetTriggerTableResp(
- new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), null);
+ return configManager.getTriggerTable();
}
@Override
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 7de5442d30..054bf92aac 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -34,6 +34,8 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
import
org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurationPlan;
@@ -43,6 +45,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
import
org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
import
org.apache.iotdb.confignode.consensus.request.read.template.GetAllTemplateSetInfoPlan;
import
org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
@@ -69,6 +72,9 @@ import
org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan
import
org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
import
org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import
org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
import org.apache.iotdb.confignode.persistence.partition.RegionCreateTask;
import org.apache.iotdb.confignode.persistence.partition.RegionDeleteTask;
import org.apache.iotdb.confignode.procedure.Procedure;
@@ -77,11 +83,14 @@ import
org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
import org.apache.iotdb.db.metadata.template.Template;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
+import org.apache.iotdb.trigger.api.enums.TriggerEvent;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
import org.junit.Assert;
@@ -873,4 +882,63 @@ public class ConfigPhysicalPlanSerDeTest {
getPipeSinkPlanWithNullName.getPipeSinkName(),
getPipeSinkPlanWithNullName1.getPipeSinkName());
}
+
+ @Test
+ public void GetTriggerTablePlan() throws IOException {
+ GetTriggerTablePlan getTriggerTablePlan0 = new GetTriggerTablePlan();
+ Assert.assertTrue(
+
ConfigPhysicalPlan.Factory.create(getTriggerTablePlan0.serializeToByteBuffer())
+ instanceof GetTriggerTablePlan);
+ }
+
+ @Test
+ public void AddTriggerInTablePlanTest() throws IOException,
IllegalPathException {
+ TriggerInformation triggerInformation =
+ new TriggerInformation(
+ new PartialPath("root.test.**"),
+ "test",
+ "test.class",
+ "test.jar",
+ null,
+ TriggerEvent.AFTER_INSERT,
+ TTriggerState.INACTIVE,
+ false,
+ null,
+ "testMD5test");
+ AddTriggerInTablePlan addTriggerInTablePlan0 =
+ new AddTriggerInTablePlan(triggerInformation, new Binary(new byte[]
{1, 2, 3}));
+ AddTriggerInTablePlan addTriggerInTablePlan1 =
+ (AddTriggerInTablePlan)
+
ConfigPhysicalPlan.Factory.create(addTriggerInTablePlan0.serializeToByteBuffer());
+ Assert.assertEquals(
+ addTriggerInTablePlan0.getTriggerInformation(),
+ addTriggerInTablePlan1.getTriggerInformation());
+ Assert.assertEquals(addTriggerInTablePlan0.getJarFile(),
addTriggerInTablePlan1.getJarFile());
+ }
+
+ @Test
+ public void DeleteTriggerInTablePlanTest() throws IOException {
+ DeleteTriggerInTablePlan deleteTriggerInTablePlan0 = new
DeleteTriggerInTablePlan("test");
+ DeleteTriggerInTablePlan deleteTriggerInTablePlan1 =
+ (DeleteTriggerInTablePlan)
+
ConfigPhysicalPlan.Factory.create(deleteTriggerInTablePlan0.serializeToByteBuffer());
+ Assert.assertEquals(
+ deleteTriggerInTablePlan0.getTriggerName(),
deleteTriggerInTablePlan1.getTriggerName());
+ }
+
+ @Test
+ public void UpdateTriggerStateInTablePlanTest() throws IOException {
+ UpdateTriggerStateInTablePlan updateTriggerStateInTablePlan0 =
+ new UpdateTriggerStateInTablePlan("test", TTriggerState.ACTIVE);
+ UpdateTriggerStateInTablePlan updateTriggerStateInTablePlan1 =
+ (UpdateTriggerStateInTablePlan)
+ ConfigPhysicalPlan.Factory.create(
+ updateTriggerStateInTablePlan0.serializeToByteBuffer());
+ Assert.assertEquals(
+ updateTriggerStateInTablePlan0.getTriggerName(),
+ updateTriggerStateInTablePlan1.getTriggerName());
+ Assert.assertEquals(
+ updateTriggerStateInTablePlan0.getTriggerState(),
+ updateTriggerStateInTablePlan1.getTriggerState());
+ }
}
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedureTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedureTest.java
new file mode 100644
index 0000000000..6481f6ba60
--- /dev/null
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedureTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.trigger.api.enums.TriggerEvent;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class CreateTriggerProcedureTest {
+
+ @Test
+ public void serializeDeserializeTest() throws IllegalPathException {
+
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+
+ TriggerInformation triggerInformation =
+ new TriggerInformation(
+ new PartialPath("root.test.**"),
+ "test",
+ "test.class",
+ "test.jar",
+ null,
+ TriggerEvent.AFTER_INSERT,
+ TTriggerState.INACTIVE,
+ false,
+ null,
+ "testMD5test");
+ CreateTriggerProcedure p1 =
+ new CreateTriggerProcedure(triggerInformation, new Binary(new byte[]
{1, 2, 3}));
+
+ try {
+ p1.serialize(outputStream);
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+
+ CreateTriggerProcedure p2 =
+ (CreateTriggerProcedure)
ProcedureFactory.getInstance().create(buffer);
+ assertEquals(p1, p2);
+ assertEquals(p1.getJarFile(), p2.getJarFile());
+
+ } catch (Exception e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void serializeDeserializeWithoutJarFileTest() throws
IllegalPathException {
+
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+
+ TriggerInformation triggerInformation =
+ new TriggerInformation(
+ new PartialPath("root.test.**"),
+ "test",
+ "test.class",
+ "test.jar",
+ null,
+ TriggerEvent.AFTER_INSERT,
+ TTriggerState.INACTIVE,
+ false,
+ null,
+ "testMD5test");
+ CreateTriggerProcedure p1 = new CreateTriggerProcedure(triggerInformation,
null);
+
+ try {
+ p1.serialize(outputStream);
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+
+ CreateTriggerProcedure p2 =
+ (CreateTriggerProcedure)
ProcedureFactory.getInstance().create(buffer);
+ assertEquals(p1, p2);
+ assertEquals(p1.getJarFile(), p2.getJarFile());
+
+ } catch (Exception e) {
+ fail();
+ }
+ }
+}
diff --git
a/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
b/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
index 888d0a6ec1..a2a70d72da 100644
--- a/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
+++ b/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
@@ -29,4 +29,5 @@ data_replication_factor=3
system_dir=target/confignode1/system
data_dirs=target/confignode1/data
consensus_dir=target/confignode1/consensus
+trigger_lib_dir=target/confignode1/ext/trigger
proc_wal_dir=target/confignode1/proc
\ No newline at end of file
diff --git
a/confignode/src/test/resources/confignode2conf/iotdb-confignode.properties
b/confignode/src/test/resources/confignode2conf/iotdb-confignode.properties
index 66c6045d59..b4a607642b 100644
--- a/confignode/src/test/resources/confignode2conf/iotdb-confignode.properties
+++ b/confignode/src/test/resources/confignode2conf/iotdb-confignode.properties
@@ -28,4 +28,5 @@ data_replication_factor=3
system_dir=target/confignode2/system
data_dirs=target/confignode2/data
consensus_dir=target/confignode2/consensus
+trigger_lib_dir=target/confignode2/ext/trigger
proc_wal_dir=target/confignode2/proc
\ No newline at end of file
diff --git
a/confignode/src/test/resources/confignode3conf/iotdb-confignode.properties
b/confignode/src/test/resources/confignode3conf/iotdb-confignode.properties
index 0d53e4a6dd..6fca1c3ad7 100644
--- a/confignode/src/test/resources/confignode3conf/iotdb-confignode.properties
+++ b/confignode/src/test/resources/confignode3conf/iotdb-confignode.properties
@@ -28,4 +28,5 @@ data_replication_factor=3
system_dir=target/confignode3/system
data_dirs=target/confignode3/data
consensus_dir=target/confignode3/consensus
+trigger_lib_dir=target/confignode3/ext/trigger
proc_wal_dir=target/confignode3/proc
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
b/example/trigger/src/main/java/org/apache/iotdb/trigger/SimpleTrigger.java
similarity index 57%
copy from
node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
copy to
example/trigger/src/main/java/org/apache/iotdb/trigger/SimpleTrigger.java
index 82705274bf..a3c797d41e 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
+++ b/example/trigger/src/main/java/org/apache/iotdb/trigger/SimpleTrigger.java
@@ -16,15 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.iotdb.trigger;
-package org.apache.iotdb.commons.trigger.exception;
+import org.apache.iotdb.trigger.api.Trigger;
+import org.apache.iotdb.tsfile.write.record.Tablet;
-public class TriggerJarToLargeException extends RuntimeException {
- public TriggerJarToLargeException(String message) {
- super(message);
- }
+import java.util.Arrays;
+
+public class SimpleTrigger implements Trigger {
- public TriggerJarToLargeException(String message, Throwable cause) {
- super(message, cause);
+ @Override
+ public boolean fire(Tablet tablet) {
+ System.out.println("receive a tablet, device name is " + tablet.deviceId);
+ System.out.println("measurements are: ");
+ tablet
+ .getSchemas()
+ .forEach(measurementSchema ->
System.out.println(measurementSchema.getMeasurementId()));
+ System.out.println("time are: " + Arrays.toString(tablet.timestamps));
+ return true;
}
}
diff --git
a/example/trigger/src/main/java/org/apache/iotdb/trigger/AlertingExample.java
b/example/trigger/src/main/java/org/apache/iotdb/trigger/old/AlertingExample.java
similarity index 99%
rename from
example/trigger/src/main/java/org/apache/iotdb/trigger/AlertingExample.java
rename to
example/trigger/src/main/java/org/apache/iotdb/trigger/old/AlertingExample.java
index c9bb2b663d..ec7fe2bce3 100644
---
a/example/trigger/src/main/java/org/apache/iotdb/trigger/AlertingExample.java
+++
b/example/trigger/src/main/java/org/apache/iotdb/trigger/old/AlertingExample.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.trigger;
+package org.apache.iotdb.trigger.old;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.trigger.api.Trigger;
diff --git
a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
b/example/trigger/src/main/java/org/apache/iotdb/trigger/old/TriggerExample.java
similarity index 99%
rename from
example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
rename to
example/trigger/src/main/java/org/apache/iotdb/trigger/old/TriggerExample.java
index a41346ecc3..6bfe230bd3 100644
--- a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
+++
b/example/trigger/src/main/java/org/apache/iotdb/trigger/old/TriggerExample.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.trigger;
+package org.apache.iotdb.trigger.old;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.trigger.api.Trigger;
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index 93e81cb656..eb43c9a92d 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.commons.executable;
-import org.apache.iotdb.commons.trigger.exception.TriggerJarToLargeException;
+import org.apache.iotdb.commons.trigger.exception.TriggerJarTooLargeException;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.commons.io.FileUtils;
@@ -196,7 +196,7 @@ public class ExecutableManager {
long size = fileChannel.size();
if (size > Integer.MAX_VALUE) {
// Max length of Thrift Binary is Integer.MAX_VALUE bytes.
- throw new TriggerJarToLargeException(
+ throw new TriggerJarTooLargeException(
String.format("Size of file exceed %d bytes", Integer.MAX_VALUE));
}
ByteBuffer byteBuffer = ByteBuffer.allocate((int) size);
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
b/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
index 8ef66a4ac8..de2a16c4ad 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
@@ -31,8 +31,8 @@ import
org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -275,7 +275,7 @@ public class AlignedPath extends PartialPath {
}
@Override
- public void serialize(DataOutputStream stream) throws IOException {
+ public void serialize(OutputStream stream) throws IOException {
PathType.Aligned.serialize(stream);
super.serializeWithoutType(stream);
ReadWriteIOUtils.write(measurementList.size(), stream);
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
b/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
index 0dcbdb66b6..ec57075d68 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -193,7 +194,7 @@ public class MeasurementPath extends PartialPath {
}
@Override
- public void serialize(DataOutputStream stream) throws IOException {
+ public void serialize(OutputStream stream) throws IOException {
PathType.Measurement.serialize(stream);
super.serializeWithoutType(stream);
if (measurementSchema == null) {
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index 3122741e43..6d2bde0808 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -33,8 +33,8 @@ import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -676,22 +676,21 @@ public class PartialPath extends Path implements
Comparable<Path>, Cloneable {
}
public ByteBuffer serialize() throws IOException {
- PublicBAOS byteArrayOutputStream = new PublicBAOS();
- DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
- serialize(outputStream);
- return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ PublicBAOS publicBAOS = new PublicBAOS();
+ serialize((OutputStream) publicBAOS);
+ return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
}
@Override
- public void serialize(ByteBuffer byteBuffer) {
- PathType.Partial.serialize(byteBuffer);
- serializeWithoutType(byteBuffer);
+ public void serialize(OutputStream stream) throws IOException {
+ PathType.Partial.serialize(stream);
+ serializeWithoutType(stream);
}
@Override
- public void serialize(DataOutputStream stream) throws IOException {
- PathType.Partial.serialize(stream);
- serializeWithoutType(stream);
+ public void serialize(ByteBuffer byteBuffer) {
+ PathType.Partial.serialize(byteBuffer);
+ serializeWithoutType(byteBuffer);
}
@Override
@@ -710,7 +709,7 @@ public class PartialPath extends Path implements
Comparable<Path>, Cloneable {
}
@Override
- protected void serializeWithoutType(DataOutputStream stream) throws
IOException {
+ protected void serializeWithoutType(OutputStream stream) throws IOException {
super.serializeWithoutType(stream);
ReadWriteIOUtils.write(nodes.length, stream);
for (String node : nodes) {
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/path/PathType.java
b/node-commons/src/main/java/org/apache/iotdb/commons/path/PathType.java
index 832667fa91..6513fa6d24 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PathType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/PathType.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.commons.path;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
public enum PathType {
@@ -41,7 +41,7 @@ public enum PathType {
ReadWriteIOUtils.write(pathType, buffer);
}
- public void serialize(DataOutputStream stream) throws IOException {
+ public void serialize(OutputStream stream) throws IOException {
ReadWriteIOUtils.write(pathType, stream);
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
index 0d3649ff6c..119321d143 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.commons.trigger;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
import org.apache.iotdb.trigger.api.enums.TriggerEvent;
@@ -30,6 +31,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
+import java.util.Objects;
/** This Class used to save the specific information of one Trigger. */
public class TriggerInformation {
@@ -101,7 +103,7 @@ public class TriggerInformation {
public static TriggerInformation deserialize(ByteBuffer byteBuffer) {
TriggerInformation triggerInformation = new TriggerInformation();
- triggerInformation.pathPattern = PartialPath.deserialize(byteBuffer);
+ triggerInformation.pathPattern = (PartialPath)
PathDeserializeUtil.deserialize(byteBuffer);
triggerInformation.triggerName = ReadWriteIOUtils.readString(byteBuffer);
triggerInformation.className = ReadWriteIOUtils.readString(byteBuffer);
triggerInformation.jarName = ReadWriteIOUtils.readString(byteBuffer);
@@ -119,6 +121,28 @@ public class TriggerInformation {
return triggerInformation;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TriggerInformation that = (TriggerInformation) o;
+ return Objects.equals(triggerName, that.triggerName)
+ && Objects.equals(pathPattern, that.pathPattern)
+ && isStateful == that.isStateful
+ && Objects.equals(className, that.className)
+ && Objects.equals(jarName, that.jarName)
+ && Objects.equals(attributes, that.attributes)
+ && event == that.event
+ && triggerState == that.triggerState
+ && (isStateful() ? Objects.equals(dataNodeLocation,
that.dataNodeLocation) : true)
+ && Objects.equals(jarFileMD5, that.jarFileMD5);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(triggerName);
+ }
+
public PartialPath getPathPattern() {
return pathPattern;
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
index a81d511b79..75f97417bb 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
@@ -22,8 +22,11 @@ import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
import javax.annotation.concurrent.NotThreadSafe;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/** This Class used to save the information of Triggers and implements methods
of manipulate it. */
@NotThreadSafe
@@ -31,7 +34,7 @@ public class TriggerTable {
private final Map<String, TriggerInformation> triggerTable;
public TriggerTable() {
- triggerTable = new HashMap<>();
+ triggerTable = new ConcurrentHashMap<>();
}
public TriggerTable(Map<String, TriggerInformation> triggerTable) {
@@ -74,11 +77,15 @@ public class TriggerTable {
return allTriggerStates;
}
// for getTriggerTable
- public Map<String, TriggerInformation> getTable() {
- return triggerTable;
+ public List<TriggerInformation> getAllTriggerInformation() {
+ return new ArrayList<>(triggerTable.values());
}
public boolean isEmpty() {
return triggerTable.isEmpty();
}
+
+ public Map<String, TriggerInformation> getTable() {
+ return triggerTable;
+ }
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarTooLargeException.java
similarity index 82%
rename from
node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
rename to
node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarTooLargeException.java
index 82705274bf..39674a7f79 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarToLargeException.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerJarTooLargeException.java
@@ -19,12 +19,12 @@
package org.apache.iotdb.commons.trigger.exception;
-public class TriggerJarToLargeException extends RuntimeException {
- public TriggerJarToLargeException(String message) {
+public class TriggerJarTooLargeException extends RuntimeException {
+ public TriggerJarTooLargeException(String message) {
super(message);
}
- public TriggerJarToLargeException(String message, Throwable cause) {
+ public TriggerJarTooLargeException(String message, Throwable cause) {
super(message, cause);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index f30a46914d..ee1acf70fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -878,7 +878,7 @@ public class ConfigNodeClient
@Override
public TSStatus createTrigger(TCreateTriggerReq req) throws TException {
- for (int i = 0; i < RETRY_NUM; i++) {
+ for (int i = 0; i < 5; i++) {
try {
TSStatus status = client.createTrigger(req);
if (!updateConfigNodeLeader(status)) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index ee9dde0b24..d91eb57ca7 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -77,12 +77,12 @@ public class ColumnHeaderConstant {
// column names for show triggers statement
public static final String COLUMN_TRIGGER_NAME = "Trigger Name";
- public static final String COLUMN_TRIGGER_EVENT = "Trigger Event";
- public static final String COLUMN_TRIGGER_TYPE = "Trigger Type";
- public static final String COLUMN_TRIGGER_STATE = "Trigger STATE";
- public static final String COLUMN_TRIGGER_PATTERN = "Trigger PathPattern";
- public static final String COLUMN_TRIGGER_CLASSNAME = "Trigger ClassName";
- public static final String COLUMN_TRIGGER_LOCATION = "Trigger Location";
+ public static final String COLUMN_TRIGGER_EVENT = "Event";
+ public static final String COLUMN_TRIGGER_TYPE = "Type";
+ public static final String COLUMN_TRIGGER_STATE = "STATE";
+ public static final String COLUMN_TRIGGER_PATTERN = "PathPattern";
+ public static final String COLUMN_TRIGGER_CLASSNAME = "ClassName";
+ public static final String COLUMN_TRIGGER_LOCATION = "Node ID";
// column names for show region statement
public static final String COLUMN_REGION_ID = "RegionId";
@@ -252,6 +252,7 @@ public class ColumnHeaderConstant {
new ColumnHeader(COLUMN_TRIGGER_TYPE, TSDataType.TEXT),
new ColumnHeader(COLUMN_TRIGGER_STATE, TSDataType.TEXT),
new ColumnHeader(COLUMN_TRIGGER_PATTERN, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_TRIGGER_CLASSNAME, TSDataType.TEXT),
new ColumnHeader(COLUMN_TRIGGER_LOCATION, TSDataType.TEXT));
public static final List<ColumnHeader> showSchemaTemplateHeaders =
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index c562b9e5ab..33db99d1dc 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.commons.executable.ExecutableManager;
import org.apache.iotdb.commons.executable.ExecutableResource;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.commons.trigger.TriggerTable;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
@@ -429,11 +428,12 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
getTriggerTableResp.getStatus().message,
getTriggerTableResp.getStatus().code));
return future;
}
+ // convert triggerTable and buildTsBlock
+
ShowTriggersTask.buildTsBlock(getTriggerTableResp.getAllTriggerInformation(),
future);
} catch (TException | IOException e) {
future.setException(e);
}
- // convert triggerTable and buildTsBlock
- ShowTriggersTask.buildTsBlock(new TriggerTable(), future);
+
return future;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowTriggersTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowTriggersTask.java
index 3bc191101c..ccf62f1c46 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowTriggersTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowTriggersTask.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
import org.apache.iotdb.commons.trigger.TriggerInformation;
-import org.apache.iotdb.commons.trigger.TriggerTable;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
@@ -37,6 +36,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;
@@ -49,14 +49,16 @@ public class ShowTriggersTask implements IConfigTask {
}
public static void buildTsBlock(
- TriggerTable triggerTable, SettableFuture<ConfigTaskResult> future) {
+ List<ByteBuffer> allTriggerInformation, SettableFuture<ConfigTaskResult>
future) {
List<TSDataType> outputDataTypes =
ColumnHeaderConstant.showTriggersColumnHeaders.stream()
.map(ColumnHeader::getColumnType)
.collect(Collectors.toList());
TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
- if (triggerTable != null && !triggerTable.isEmpty()) {
- for (TriggerInformation triggerInformation :
triggerTable.getTable().values()) {
+ if (allTriggerInformation != null && !allTriggerInformation.isEmpty()) {
+ for (ByteBuffer triggerInformationByteBuffer : allTriggerInformation) {
+ TriggerInformation triggerInformation =
+ TriggerInformation.deserialize(triggerInformationByteBuffer);
builder.getTimeColumnBuilder().writeLong(0L);
builder
.getColumnBuilder(0)
@@ -77,13 +79,15 @@ public class ShowTriggersTask implements IConfigTask {
builder
.getColumnBuilder(4)
.writeBinary(Binary.valueOf(triggerInformation.getPathPattern().toString()));
+
builder.getColumnBuilder(5).writeBinary(Binary.valueOf(triggerInformation.getClassName()));
builder
- .getColumnBuilder(5)
+ .getColumnBuilder(6)
.writeBinary(
Binary.valueOf(
!triggerInformation.isStateful()
? "ALL"
- :
triggerInformation.getDataNodeLocation().internalEndPoint.getIp()));
+ : String.valueOf(
+
triggerInformation.getDataNodeLocation().getDataNodeId())));
builder.declarePosition();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index e99eab0125..a3bb7f546d 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -40,7 +40,6 @@ import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
-import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
@@ -109,6 +108,7 @@ import
org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
+import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
@@ -925,11 +925,11 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
public TSStatus createTriggerInstance(TCreateTriggerInstanceReq req) throws
TException {
TriggerInformation triggerInformation =
TriggerInformation.deserialize(req.triggerInformation);
try {
- // set state to INACTIVE when creating trigger instance
- triggerInformation.setTriggerState(TTriggerState.INACTIVE);
// save jar file at trigger_lib_dir
- TriggerExecutableManager.getInstance()
- .writeToLibDir(req.jarFile, triggerInformation.getJarName());
+ if (req.getJarFile() != null) {
+ TriggerExecutableManager.getInstance()
+ .writeToLibDir(req.jarFile, triggerInformation.getJarName());
+ }
// register trigger information with TriggerRegistrationService
// config nodes take responsibility for synchronization control
TriggerManagementService.getInstance().register(triggerInformation);
@@ -959,6 +959,19 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
+ @Override
+ public TSStatus inactiveTriggerInstance(TInactiveTriggerInstanceReq req)
throws TException {
+ try {
+ TriggerManagementService.getInstance().inactiveTrigger(req.triggerName);
+ } catch (Exception e) {
+ LOGGER.error("Error occurred during ");
+ return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(e.getMessage());
+ }
+
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
@Override
public TSStatus dropTriggerInstance(TDropTriggerInstanceReq req) throws
TException {
try {
diff --git
a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
index 25e7209537..4994ec1270 100644
---
a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
+++
b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.trigger.TriggerTable;
import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
import org.apache.iotdb.trigger.api.Trigger;
@@ -51,6 +52,8 @@ public class TriggerManagementService {
private final Map<String, TriggerExecutor> executorMap;
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+
private static final int DATA_NODE_ID =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
private TriggerManagementService() {
@@ -155,8 +158,8 @@ public class TriggerManagementService {
"Failed to registered trigger %s, "
+ "because error occurred when trying to compute md5
of jar file for trigger %s ",
triggerName, triggerName);
- LOGGER.warn(errorMessage);
- throw e;
+ LOGGER.warn(errorMessage, e);
+ throw new TriggerManagementException(errorMessage);
}
}
@@ -214,7 +217,6 @@ public class TriggerManagementService {
"Failed to reflect trigger instance with className(%s), because
%s", className, e));
}
}
-
/////////////////////////////////////////////////////////////////////////////////////////////////
// singleton instance holder
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/server/src/test/resources/datanode1conf/iotdb-datanode.properties
b/server/src/test/resources/datanode1conf/iotdb-datanode.properties
index ad13e4a1d9..f26d613df6 100644
--- a/server/src/test/resources/datanode1conf/iotdb-datanode.properties
+++ b/server/src/test/resources/datanode1conf/iotdb-datanode.properties
@@ -32,8 +32,9 @@ system_dir=target/datanode1/system
data_dirs=target/datanode1/data
wal_dirs=target/datanode1/wal
index_root_dir=target/datanode1/data/index
-udf_root_dir=target/datanode1/ext
+udf_root_dir=target/datanode1/ext/udf
tracing_dir=target/datanode1/data/tracing
consensus_dir=target/datanode1/consensus
+trigger_root_dir=target/datanode1/ext/trigger
sync_dir=target/datanode1/sync
timestamp_precision=ms
diff --git a/server/src/test/resources/datanode2conf/iotdb-datanode.properties
b/server/src/test/resources/datanode2conf/iotdb-datanode.properties
index 59d1ee3358..927da3237f 100644
--- a/server/src/test/resources/datanode2conf/iotdb-datanode.properties
+++ b/server/src/test/resources/datanode2conf/iotdb-datanode.properties
@@ -32,9 +32,9 @@ system_dir=target/datanode2/system
data_dirs=target/datanode2/data
wal_dirs=target/datanode2/wal
index_root_dir=target/datanode2/data/index
-udf_root_dir=target/datanode2/ext
+udf_root_dir=target/datanode2/ext/udf
tracing_dir=target/datanode2/data/tracing
consensus_dir=target/datanode2/consensus
-timestamp_precision=ms
+trigger_root_dir=target/datanode2/ext/trigger
sync_dir=target/datanode2/sync
timestamp_precision=ms
diff --git a/server/src/test/resources/datanode3conf/iotdb-datanode.properties
b/server/src/test/resources/datanode3conf/iotdb-datanode.properties
index fa2ab7f665..37578f269c 100644
--- a/server/src/test/resources/datanode3conf/iotdb-datanode.properties
+++ b/server/src/test/resources/datanode3conf/iotdb-datanode.properties
@@ -35,5 +35,6 @@ index_root_dir=target/datanode3/data/index
udf_root_dir=target/datanode3/ext
tracing_dir=target/datanode3/data/tracing
consensus_dir=target/datanode3/consensus
+trigger_root_dir=target/datanode3/ext/trigger
sync_dir=target/datanode3/sync
timestamp_precision=ms
\ No newline at end of file
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 1bca6b5107..85315b196f 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -165,7 +165,8 @@ public enum TSStatusCode {
REGION_LEADER_CHANGE_FAILED(918),
REMOVE_DATANODE_FAILED(919),
OVERLAP_WITH_EXISTING_DELETE_TIMESERIES_TASK(920),
- NOT_AVAILABLE_REGION_GROUP(921);
+ NOT_AVAILABLE_REGION_GROUP(921),
+ CREATE_TRIGGER_ERROR(922);
private int statusCode;
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index 94a651c79e..e4ffd2a981 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -260,9 +260,7 @@ struct TDropFunctionReq {
enum TTriggerState {
// The intermediate state of Create trigger, the trigger need to create has
not yet activated on any DataNodes.
INACTIVE
- // The intermediate state of Create trigger, the trigger need to create has
activated on some DataNodes.
- PARTIAL_ACTIVE
- // Triggers on all DataNodes are available.
+ // The successful state of Create trigger, the trigger need to create has
activated on some DataNodes.
ACTIVE
// The intermediate state of Drop trigger, the cluster is in the process of
removing the trigger.
DROPPING
@@ -289,7 +287,7 @@ struct TDropTriggerReq {
// Get trigger table from config node
struct TGetTriggerTableResp {
1: required common.TSStatus status
- 2: required binary triggerTable
+ 2: required list<binary> allTriggerInformation
}
// Show cluster
diff --git a/thrift/src/main/thrift/datanode.thrift
b/thrift/src/main/thrift/datanode.thrift
index 12d3a41ddb..60cb2a2d43 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -178,6 +178,10 @@ struct TActiveTriggerInstanceReq {
1: required string triggerName
}
+struct TInactiveTriggerInstanceReq {
+ 1: required string triggerName
+}
+
struct TDropTriggerInstanceReq {
1: required string triggerName
2: required bool needToDeleteJarFile
@@ -430,11 +434,20 @@ service IDataNodeRPCService {
**/
common.TSStatus activeTriggerInstance(TActiveTriggerInstanceReq req)
+
+ /**
+ * Config node will inactive a trigger instance on data node.
+ *
+ * @param trigger name.
+ **/
+ common.TSStatus inactiveTriggerInstance(TInactiveTriggerInstanceReq req)
+
+
/**
- * Config node will drop a trigger on all online config nodes and data
nodes.
- *
- * @param trigger name, whether need to delete jar
- **/
+ * Config node will drop a trigger on all online config nodes and data nodes.
+ *
+ * @param trigger name, whether need to delete jar
+ **/
common.TSStatus dropTriggerInstance(TDropTriggerInstanceReq req)
/**
diff --git
a/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/TriggerType.java
b/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/TriggerType.java
index 84e449fa42..bf801d7f46 100644
---
a/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/TriggerType.java
+++
b/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/TriggerType.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.trigger.api.enums;
public enum TriggerType {
STATEFUL((byte) 0, "STATEFUL"),
- STATELESS((byte) 0, "STATELESS");
+ STATELESS((byte) 1, "STATELESS");
private final byte id;
private final String type;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
index 336f7edc83..e3504533d7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
@@ -27,8 +27,8 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
-import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
@@ -182,7 +182,7 @@ public class Path implements Serializable, Comparable<Path>
{
serializeWithoutType(byteBuffer);
}
- public void serialize(DataOutputStream stream) throws IOException {
+ public void serialize(OutputStream stream) throws IOException {
ReadWriteIOUtils.write((byte) 3, stream); //
org.apache.iotdb.db.metadata.path#PathType
serializeWithoutType(stream);
}
@@ -198,7 +198,7 @@ public class Path implements Serializable, Comparable<Path>
{
ReadWriteIOUtils.write(fullPath, byteBuffer);
}
- protected void serializeWithoutType(DataOutputStream stream) throws
IOException {
+ protected void serializeWithoutType(OutputStream stream) throws IOException {
ReadWriteIOUtils.write(measurement, stream);
ReadWriteIOUtils.write(device, stream);
ReadWriteIOUtils.write(fullPath, stream);