This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch TriggerTest in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b4c37fdeb41a4e7bf4ebfa07df4eae2df3cf5363 Merge: 5b686eaed6 85329a92a8 Author: JackieTien97 <[email protected]> AuthorDate: Tue Sep 27 10:00:38 2022 +0800 Resolve conflicts .../consensus/request/ConfigPhysicalPlan.java | 12 + .../consensus/request/ConfigPhysicalPlanType.java | 5 +- .../request/write/sync/CreatePipeSinkPlan.java | 57 +++++ .../request/write/sync/DropPipeSinkPlan.java | 56 +++++ .../request/write/sync/GetPipeSinkPlan.java | 55 +++++ .../consensus/response/PipeSinkResp.java | 31 ++- .../iotdb/confignode/manager/ConfigManager.java | 43 +++- .../apache/iotdb/confignode/manager/IManager.java | 28 +++ .../iotdb/confignode/manager/SyncManager.java | 101 ++++++++ .../iotdb/confignode/persistence/TriggerInfo.java | 10 +- .../persistence/executor/ConfigPlanExecutor.java | 15 +- .../persistence/sync/ClusterSyncInfo.java | 118 +++++++++ .../thrift/ConfigNodeRPCServiceProcessor.java | 21 ++ .../request/ConfigPhysicalPlanSerDeTest.java | 47 ++++ .../multileader/MultiLeaderServerImpl.java | 26 +- docs/UserGuide/Cluster/Cluster-Setup.md | 48 ++-- docs/zh/UserGuide/Cluster/Cluster-Setup.md | 48 ++-- docs/zh/UserGuide/Data-Concept/Time-Partition.md | 6 +- .../UserGuide/Reference/DataNode-Config-Manual.md | 27 --- .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java | 3 +- .../apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java | 18 +- .../db/integration/sync/IoTDBSyncReceiverIT.java | 2 +- .../db/integration/sync/IoTDBSyncSenderIT.java | 2 +- .../exception/sync/PipeDataLoadException.java | 2 +- .../commons}/exception/sync/PipeException.java | 2 +- .../exception/sync/PipeServerException.java | 2 +- .../commons}/exception/sync/PipeSinkException.java | 2 +- .../commons/executable/ExecutableManager.java | 8 +- .../apache/iotdb/commons/path/MeasurementPath.java | 26 ++ .../apache/iotdb/commons/service/ServiceType.java | 3 - .../iotdb/commons/sync/metadata/SyncMetadata.java | 212 +++++++++++++++++ .../apache/iotdb/commons/sync}/pipe/PipeInfo.java | 4 +- .../iotdb/commons/sync}/pipe/PipeMessage.java | 2 +- .../apache/iotdb/commons/sync/pipe/PipeStatus.java | 11 +- .../iotdb/commons/sync/pipe/SyncOperation.java | 10 +- .../iotdb/commons/sync}/pipe/TsFilePipeInfo.java | 4 +- .../commons/sync/pipesink}/IoTDBPipeSink.java | 29 ++- .../iotdb/commons/sync/pipesink}/PipeSink.java | 25 +- .../commons/sync/{ => utils}/SyncConstant.java | 2 +- .../commons/sync/{ => utils}/SyncPathUtil.java | 2 +- .../apache/iotdb/commons/trigger/TriggerTable.java | 21 +- .../trigger/service/TriggerExecutableManager.java | 34 +-- .../commons/utils/ThriftConfigNodeSerDeUtils.java | 19 ++ .../apache/iotdb/db/client/ConfigNodeClient.java | 52 ++++ .../iotdb/db/engine/storagegroup/DataRegion.java | 6 +- .../engine/trigger/executor/TriggerExecutor.java | 2 +- .../trigger/service/TriggerClassLoaderManager.java | 2 +- .../service/TriggerRegistrationService.java | 2 +- .../iotdb/db/localconfignode/LocalConfigNode.java | 18 +- .../metadata/visitor/SchemaExecutionVisitor.java | 17 +- .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 2 +- .../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 6 +- .../mpp/plan/execution/config/ConfigExecution.java | 9 +- .../config/executor/ClusterConfigTaskExecutor.java | 132 ++++++++--- .../executor/StandaloneConfigTaskExecutor.java | 16 +- .../config/sys/sync/ShowPipeSinkTask.java | 27 ++- .../memory/StatementMemorySourceVisitor.java | 2 +- .../iotdb/db/mpp/plan/parser/ASTVisitor.java | 15 +- .../scheduler/load/LoadTsFileDispatcherImpl.java | 2 +- .../plan/statement/crud/LoadTsFileStatement.java | 7 +- .../sys/sync/CreatePipeSinkStatement.java | 2 +- .../statement/sys/sync/CreatePipeStatement.java | 2 +- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 6 +- .../iotdb/db/qp/physical/sys/CreatePipePlan.java | 2 +- .../db/qp/physical/sys/CreatePipeSinkPlan.java | 2 +- .../java/org/apache/iotdb/db/service/DataNode.java | 17 +- .../db/service/DataNodeServerCommandLine.java | 2 +- .../impl/DataNodeInternalRPCServiceImpl.java | 7 +- .../java/org/apache/iotdb/db/sync/SyncService.java | 35 +-- .../iotdb/db/sync/common/ISyncInfoFetcher.java | 6 +- .../apache/iotdb/db/sync/common/LocalSyncInfo.java | 164 +++++++++++++ .../iotdb/db/sync/common/LocalSyncInfoFetcher.java | 92 +++---- .../org/apache/iotdb/db/sync/common/SyncInfo.java | 264 --------------------- .../db/sync/common/persistence/SyncLogReader.java | 8 +- .../db/sync/common/persistence/SyncLogWriter.java | 11 +- .../iotdb/db/sync/pipedata/TsFilePipeData.java | 2 +- .../sync/pipedata/queue/BufferedPipeDataQueue.java | 4 +- .../db/sync/receiver/load/DeletionLoader.java | 2 +- .../iotdb/db/sync/receiver/load/ILoader.java | 2 +- .../iotdb/db/sync/receiver/load/SchemaLoader.java | 2 +- .../iotdb/db/sync/receiver/load/TsFileLoader.java | 2 +- .../db/sync/sender/pipe/ExternalPipeSink.java | 10 +- .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java | 14 +- .../{PipeMessage.java => PipeSinkFactory.java} | 45 ++-- .../iotdb/db/sync/sender/pipe/TsFilePipe.java | 11 +- .../db/sync/sender/recovery/TsFilePipeLogger.java | 4 +- .../db/sync/transport/client/IoTDBSyncClient.java | 4 +- .../db/sync/transport/client/SenderManager.java | 8 +- .../sync/transport/client/SyncClientFactory.java | 6 +- .../db/sync/transport/server/ReceiverManager.java | 6 +- .../db}/trigger/service/TriggerClassLoader.java | 2 +- .../trigger/service/TriggerClassLoaderManager.java | 55 ++--- .../trigger/service/TriggerManagementService.java | 64 ++--- .../apache/iotdb/db/utils/sync/SyncPipeUtil.java | 32 ++- .../db/metadata/path/MeasurementPathTest.java | 46 ++++ .../org/apache/iotdb/db/sync/SyncTestUtils.java | 8 +- .../sync/pipedata/BufferedPipeDataQueueTest.java | 4 +- .../{SyncInfoTest.java => LocalSyncInfoTest.java} | 46 ++-- .../db/sync/receiver/recovery/SyncLogTest.java | 16 +- .../iotdb/db/sync/transport/SyncTransportTest.java | 6 +- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 2 + .../src/main/thrift/confignode.thrift | 34 ++- thrift/src/main/thrift/datanode.thrift | 2 + 103 files changed, 1714 insertions(+), 861 deletions(-) diff --cc confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java index f7ffe6c0e2,9073554974..5a7158a462 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java @@@ -55,11 -55,11 +55,14 @@@ import org.apache.iotdb.confignode.cons import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetStorageGroupPlan; import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTTLPlan; import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimePartitionIntervalPlan; + import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan; + import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan; + import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan; import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan; import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan; +import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan; +import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan; +import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan; import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.iotdb.db.exception.runtime.SerializationRunTimeException; import org.apache.iotdb.tsfile.utils.PublicBAOS; diff --cc confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 2babc8c52f,5e3d1126f0..97aefa9cb2 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@@ -79,14 -80,15 +81,17 @@@ import org.apache.iotdb.confignode.pers import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor; import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo; + import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo; import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq; +import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp; + import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq; + import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp; import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; @@@ -149,8 -151,8 +154,10 @@@ public class ConfigManager implements I /** UDF */ private final UDFManager udfManager; + /** Manage Trigger */ + private final TriggerManager triggerManager; + /** Sync */ + private final SyncManager syncManager; public ConfigManager() throws IOException { // Build the persistence module @@@ -160,7 -162,7 +167,8 @@@ AuthorInfo authorInfo = new AuthorInfo(); ProcedureInfo procedureInfo = new ProcedureInfo(); UDFInfo udfInfo = new UDFInfo(); + TriggerInfo triggerInfo = new TriggerInfo(); + ClusterSyncInfo syncInfo = new ClusterSyncInfo(); // Build state machine and executor ConfigPlanExecutor executor = @@@ -171,7 -173,7 +179,8 @@@ authorInfo, procedureInfo, udfInfo, - triggerInfo); ++ triggerInfo, + syncInfo); PartitionRegionStateMachine stateMachine = new PartitionRegionStateMachine(this, executor); // Build the manager module @@@ -181,8 -183,8 +190,9 @@@ this.permissionManager = new PermissionManager(this, authorInfo); this.procedureManager = new ProcedureManager(this, procedureInfo); this.udfManager = new UDFManager(this, udfInfo); + this.triggerManager = new TriggerManager(this, triggerInfo); this.loadManager = new LoadManager(this); + this.syncManager = new SyncManager(this, syncInfo); // ConsensusManager must be initialized last, as it would load states from disk and reinitialize // above managers diff --cc confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index 40192d6329,38908a89a7..1f4c7fc55e --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@@ -43,12 -45,12 +45,14 @@@ import org.apache.iotdb.confignode.mana import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq; +import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp; + import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq; + import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp; import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; diff --cc confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java index e30364410d,0000000000..9786a058d5 mode 100644,000000..100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java @@@ -1,153 -1,0 +1,147 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.persistence; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; - import org.apache.iotdb.commons.exception.StartupException; +import org.apache.iotdb.commons.snapshot.SnapshotProcessor; +import org.apache.iotdb.commons.trigger.TriggerInformation; +import org.apache.iotdb.commons.trigger.TriggerTable; +import org.apache.iotdb.commons.trigger.exception.TriggerManagementException; +import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager; +import org.apache.iotdb.confignode.conf.ConfigNodeConfig; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan; +import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan; +import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; + +public class TriggerInfo implements SnapshotProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(TriggerInfo.class); + + private static final ConfigNodeConfig CONFIG_NODE_CONF = + ConfigNodeDescriptor.getInstance().getConf(); + + private final TriggerTable triggerTable; + private final Map<String, String> existedJarToMD5; + // private final Map<String, AtomicInteger> jarReferenceTable; + + private final TriggerExecutableManager triggerExecutableManager; + - private ReentrantLock triggerTableLock = new ReentrantLock(); ++ private final ReentrantLock triggerTableLock = new ReentrantLock(); + - public TriggerInfo() { ++ public TriggerInfo() throws IOException { + triggerTable = new TriggerTable(); + existedJarToMD5 = new HashMap<>(); + // jarReferenceTable = new ConcurrentHashMap<>(); + triggerExecutableManager = + TriggerExecutableManager.setupAndGetInstance( + CONFIG_NODE_CONF.getTemporaryLibDir(), CONFIG_NODE_CONF.getTriggerLibDir()); - try { - triggerExecutableManager.start(); - } catch (StartupException e) { - throw new RuntimeException(e); - } + } + + public void acquireTriggerTableLock() { + triggerTableLock.lock(); + } + + public void releaseTriggerTableLock() { + triggerTableLock.unlock(); + } + + @Override + public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException { + return false; + } + + @Override + public void processLoadSnapshot(File snapshotDir) throws TException, IOException {} + + /** + * Validate whether the trigger can be created + * + * @param triggerName + * @param jarName + * @param jarMD5 + * @return whether the JarFile in this request need to be saved + */ + public boolean validate(String triggerName, String jarName, String jarMD5) { + if (triggerTable.containsTrigger(triggerName)) { + throw new TriggerManagementException( + String.format( + "Failed to create trigger [%s], the same name trigger has been created", + triggerName)); + } + + if (existedJarToMD5.containsKey(jarName)) { + if (existedJarToMD5.get(jarName).equals(jarMD5)) { + return false; + } else { + throw new TriggerManagementException( + String.format( + "Failed to create trigger [%s], the same name Jar [%s] but different MD5 [%s] has existed", + triggerName, jarName, jarMD5)); + } + } else { + return true; + } + } + + public TSStatus addTriggerInTable(AddTriggerInTablePlan physicalPlan) { + try { + TriggerInformation triggerInformation = physicalPlan.getTriggerInformation(); + String triggerName = triggerInformation.getTriggerName(); + triggerTable.addTriggerInformation(triggerName, triggerInformation); + existedJarToMD5.put(triggerName, triggerInformation.getJarFileMD5()); + if (physicalPlan.getJarFile() != null) { + triggerExecutableManager.writeToLibDir( + ByteBuffer.wrap(physicalPlan.getJarFile().getValues()), + triggerInformation.getJarName()); + } + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } catch (Exception e) { + final String errorMessage = + String.format( + "Failed to create trigger [%s] when addTriggerInTable, because of exception: %s", + physicalPlan.getTriggerInformation().getTriggerName(), e); + LOGGER.warn(errorMessage, e); + return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage(errorMessage); + } + } + + public TSStatus deleteTriggerInTable(DeleteTriggerInTablePlan physicalPlan) { + triggerTable.deleteTriggerInformation(physicalPlan.getTriggerName()); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + + public TSStatus updateTriggerStateInTable(UpdateTriggerStateInTablePlan physicalPlan) { + triggerTable.setTriggerState(physicalPlan.getTriggerName(), physicalPlan.getTriggerState()); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } +} diff --cc confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index acffb3a7fa,6f0023c7bf..b39653c310 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@@ -56,11 -56,11 +56,14 @@@ import org.apache.iotdb.confignode.cons import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetStorageGroupPlan; import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTTLPlan; import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimePartitionIntervalPlan; + import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan; + import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan; + import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan; import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan; import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan; +import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan; +import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan; +import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan; import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp; import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException; import org.apache.iotdb.confignode.persistence.AuthorInfo; @@@ -105,7 -105,7 +109,8 @@@ public class ConfigPlanExecutor private final UDFInfo udfInfo; + private final TriggerInfo triggerInfo; + private final ClusterSyncInfo syncInfo; public ConfigPlanExecutor( NodeInfo nodeInfo, @@@ -114,14 -114,14 +119,16 @@@ AuthorInfo authorInfo, ProcedureInfo procedureInfo, UDFInfo udfInfo, - TriggerInfo triggerInfo) { ++ TriggerInfo triggerInfo, + ClusterSyncInfo syncInfo) { this.nodeInfo = nodeInfo; this.clusterSchemaInfo = clusterSchemaInfo; this.partitionInfo = partitionInfo; this.authorInfo = authorInfo; this.procedureInfo = procedureInfo; this.udfInfo = udfInfo; + this.triggerInfo = triggerInfo; + this.syncInfo = syncInfo; } public DataSet executeQueryPlan(ConfigPhysicalPlan req) diff --cc node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java index 447bd8994e,533020680f..5f7f41983e --- a/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java @@@ -29,9 -29,11 +29,12 @@@ import org.apache.iotdb.tsfile.write.sc import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.io.ByteArrayOutputStream; + import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; + import java.nio.charset.StandardCharsets; public class MeasurementPath extends PartialPath { diff --cc node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java index 375f7e31f8,cee7bb6b4f..ae9c20a9ff --- a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java @@@ -33,12 -30,8 +33,8 @@@ import java.util.stream.Collectors public class TriggerTable { private final Map<String, TriggerInformation> triggerTable; - // todo: Maintain a PatternTree: PathPattern -> List<String> triggerNames - // Given a PathPattern, return the triggerNames of triggers whose PathPatterns match the given - // one. - public TriggerTable() { - triggerTable = new HashMap<>(); + triggerTable = new ConcurrentHashMap<>(); } public TriggerTable(Map<String, TriggerInformation> triggerTable) { diff --cc server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java index ae81a5d499,319b8bc309..f1a6b5194f --- a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java +++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java @@@ -22,14 -22,8 +22,9 @@@ package org.apache.iotdb.db.trigger.ser import org.apache.iotdb.commons.trigger.TriggerInformation; import org.apache.iotdb.commons.trigger.TriggerTable; import org.apache.iotdb.commons.trigger.exception.TriggerManagementException; - import org.apache.iotdb.commons.trigger.service.TriggerClassLoader; - import org.apache.iotdb.commons.trigger.service.TriggerClassLoaderManager; import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager; import org.apache.iotdb.confignode.rpc.thrift.TTriggerState; +import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.trigger.executor.TriggerExecutor; import org.apache.iotdb.trigger.api.Trigger; @@@ -80,11 -72,6 +75,11 @@@ public class TriggerManagementService acquireLock(); checkIfRegistered(triggerInformation); doRegister(triggerInformation); + } catch (Exception e) { + LOGGER.warn( - "Failed to register trigger({}) on data node, the cause is: {}", ++ "Failed to register trigger({}) on data node, the cause is ", + triggerInformation.getTriggerName(), - e.getMessage()); ++ e); } finally { releaseLock(); } @@@ -94,11 -81,6 +89,8 @@@ try { acquireLock(); triggerTable.setTriggerState(triggerName, TTriggerState.ACTIVE); + } catch (Exception e) { - LOGGER.warn( - "Failed to active trigger({}) on data node, the cause is: {}", - triggerName, - e.getMessage()); ++ LOGGER.warn("Failed to active trigger({}) on data node, the cause is ", triggerName, e); } finally { releaseLock(); } @@@ -108,11 -90,6 +100,8 @@@ try { acquireLock(); triggerTable.setTriggerState(triggerName, TTriggerState.INACTIVE); + } catch (Exception e) { - LOGGER.warn( - "Failed to active trigger({}) on data node, the cause is: {}", - triggerName, - e.getMessage()); ++ LOGGER.warn("Failed to active trigger({}) on data node, the cause is: ", triggerName, e); } finally { releaseLock(); } @@@ -158,8 -134,8 +146,8 @@@ "Failed to registered trigger %s, " + "because error occurred when trying to compute md5 of jar file for trigger %s ", triggerName, triggerName); -- LOGGER.warn(errorMessage); - throw e; ++ LOGGER.warn(errorMessage, e); + throw new TriggerManagementException(errorMessage); } } @@@ -219,20 -193,7 +205,6 @@@ "Failed to reflect trigger instance with className(%s), because %s", className, e)); } } - - @Override - public void start() throws StartupException {} - - @Override - public void stop() { - // nothing to do - } - - @Override - public ServiceType getID() { - return ServiceType.TRIGGER_REGISTRATION_SERVICE; - } -- ///////////////////////////////////////////////////////////////////////////////////////////////// // singleton instance holder ///////////////////////////////////////////////////////////////////////////////////////////////// diff --cc thrift/src/main/thrift/datanode.thrift index 5e285c4996,12d3a41ddb..60cb2a2d43 --- a/thrift/src/main/thrift/datanode.thrift +++ b/thrift/src/main/thrift/datanode.thrift @@@ -434,18 -430,11 +434,20 @@@ service IDataNodeRPCService **/ common.TSStatus activeTriggerInstance(TActiveTriggerInstanceReq req) ++ + /** + * Config node will inactive a trigger instance on data node. + * + * @param trigger name. + **/ + common.TSStatus inactiveTriggerInstance(TInactiveTriggerInstanceReq req) + ++ /** - * Config node will drop a trigger on all online config nodes and data nodes. - * - * @param trigger name, whether need to delete jar - **/ + * Config node will drop a trigger on all online config nodes and data nodes. + * + * @param trigger name, whether need to delete jar + **/ common.TSStatus dropTriggerInstance(TDropTriggerInstanceReq req) /**
