This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TriggerTest
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b4c37fdeb41a4e7bf4ebfa07df4eae2df3cf5363
Merge: 5b686eaed6 85329a92a8
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Sep 27 10:00:38 2022 +0800

    Resolve conflicts

 .../consensus/request/ConfigPhysicalPlan.java      |  12 +
 .../consensus/request/ConfigPhysicalPlanType.java  |   5 +-
 .../request/write/sync/CreatePipeSinkPlan.java     |  57 +++++
 .../request/write/sync/DropPipeSinkPlan.java       |  56 +++++
 .../request/write/sync/GetPipeSinkPlan.java        |  55 +++++
 .../consensus/response/PipeSinkResp.java           |  31 ++-
 .../iotdb/confignode/manager/ConfigManager.java    |  43 +++-
 .../apache/iotdb/confignode/manager/IManager.java  |  28 +++
 .../iotdb/confignode/manager/SyncManager.java      | 101 ++++++++
 .../iotdb/confignode/persistence/TriggerInfo.java  |  10 +-
 .../persistence/executor/ConfigPlanExecutor.java   |  15 +-
 .../persistence/sync/ClusterSyncInfo.java          | 118 +++++++++
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  21 ++
 .../request/ConfigPhysicalPlanSerDeTest.java       |  47 ++++
 .../multileader/MultiLeaderServerImpl.java         |  26 +-
 docs/UserGuide/Cluster/Cluster-Setup.md            |  48 ++--
 docs/zh/UserGuide/Cluster/Cluster-Setup.md         |  48 ++--
 docs/zh/UserGuide/Data-Concept/Time-Partition.md   |   6 +-
 .../UserGuide/Reference/DataNode-Config-Manual.md  |  27 ---
 .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java  |   3 +-
 .../apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java   |  18 +-
 .../db/integration/sync/IoTDBSyncReceiverIT.java   |   2 +-
 .../db/integration/sync/IoTDBSyncSenderIT.java     |   2 +-
 .../exception/sync/PipeDataLoadException.java      |   2 +-
 .../commons}/exception/sync/PipeException.java     |   2 +-
 .../exception/sync/PipeServerException.java        |   2 +-
 .../commons}/exception/sync/PipeSinkException.java |   2 +-
 .../commons/executable/ExecutableManager.java      |   8 +-
 .../apache/iotdb/commons/path/MeasurementPath.java |  26 ++
 .../apache/iotdb/commons/service/ServiceType.java  |   3 -
 .../iotdb/commons/sync/metadata/SyncMetadata.java  | 212 +++++++++++++++++
 .../apache/iotdb/commons/sync}/pipe/PipeInfo.java  |   4 +-
 .../iotdb/commons/sync}/pipe/PipeMessage.java      |   2 +-
 .../apache/iotdb/commons/sync/pipe/PipeStatus.java |  11 +-
 .../iotdb/commons/sync/pipe/SyncOperation.java     |  10 +-
 .../iotdb/commons/sync}/pipe/TsFilePipeInfo.java   |   4 +-
 .../commons/sync/pipesink}/IoTDBPipeSink.java      |  29 ++-
 .../iotdb/commons/sync/pipesink}/PipeSink.java     |  25 +-
 .../commons/sync/{ => utils}/SyncConstant.java     |   2 +-
 .../commons/sync/{ => utils}/SyncPathUtil.java     |   2 +-
 .../apache/iotdb/commons/trigger/TriggerTable.java |  21 +-
 .../trigger/service/TriggerExecutableManager.java  |  34 +--
 .../commons/utils/ThriftConfigNodeSerDeUtils.java  |  19 ++
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  52 ++++
 .../iotdb/db/engine/storagegroup/DataRegion.java   |   6 +-
 .../engine/trigger/executor/TriggerExecutor.java   |   2 +-
 .../trigger/service/TriggerClassLoaderManager.java |   2 +-
 .../service/TriggerRegistrationService.java        |   2 +-
 .../iotdb/db/localconfignode/LocalConfigNode.java  |  18 +-
 .../metadata/visitor/SchemaExecutionVisitor.java   |  17 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |   2 +-
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  |   6 +-
 .../mpp/plan/execution/config/ConfigExecution.java |   9 +-
 .../config/executor/ClusterConfigTaskExecutor.java | 132 ++++++++---
 .../executor/StandaloneConfigTaskExecutor.java     |  16 +-
 .../config/sys/sync/ShowPipeSinkTask.java          |  27 ++-
 .../memory/StatementMemorySourceVisitor.java       |   2 +-
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  15 +-
 .../scheduler/load/LoadTsFileDispatcherImpl.java   |   2 +-
 .../plan/statement/crud/LoadTsFileStatement.java   |   7 +-
 .../sys/sync/CreatePipeSinkStatement.java          |   2 +-
 .../statement/sys/sync/CreatePipeStatement.java    |   2 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   6 +-
 .../iotdb/db/qp/physical/sys/CreatePipePlan.java   |   2 +-
 .../db/qp/physical/sys/CreatePipeSinkPlan.java     |   2 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |  17 +-
 .../db/service/DataNodeServerCommandLine.java      |   2 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   7 +-
 .../java/org/apache/iotdb/db/sync/SyncService.java |  35 +--
 .../iotdb/db/sync/common/ISyncInfoFetcher.java     |   6 +-
 .../apache/iotdb/db/sync/common/LocalSyncInfo.java | 164 +++++++++++++
 .../iotdb/db/sync/common/LocalSyncInfoFetcher.java |  92 +++----
 .../org/apache/iotdb/db/sync/common/SyncInfo.java  | 264 ---------------------
 .../db/sync/common/persistence/SyncLogReader.java  |   8 +-
 .../db/sync/common/persistence/SyncLogWriter.java  |  11 +-
 .../iotdb/db/sync/pipedata/TsFilePipeData.java     |   2 +-
 .../sync/pipedata/queue/BufferedPipeDataQueue.java |   4 +-
 .../db/sync/receiver/load/DeletionLoader.java      |   2 +-
 .../iotdb/db/sync/receiver/load/ILoader.java       |   2 +-
 .../iotdb/db/sync/receiver/load/SchemaLoader.java  |   2 +-
 .../iotdb/db/sync/receiver/load/TsFileLoader.java  |   2 +-
 .../db/sync/sender/pipe/ExternalPipeSink.java      |  10 +-
 .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java |  14 +-
 .../{PipeMessage.java => PipeSinkFactory.java}     |  45 ++--
 .../iotdb/db/sync/sender/pipe/TsFilePipe.java      |  11 +-
 .../db/sync/sender/recovery/TsFilePipeLogger.java  |   4 +-
 .../db/sync/transport/client/IoTDBSyncClient.java  |   4 +-
 .../db/sync/transport/client/SenderManager.java    |   8 +-
 .../sync/transport/client/SyncClientFactory.java   |   6 +-
 .../db/sync/transport/server/ReceiverManager.java  |   6 +-
 .../db}/trigger/service/TriggerClassLoader.java    |   2 +-
 .../trigger/service/TriggerClassLoaderManager.java |  55 ++---
 .../trigger/service/TriggerManagementService.java  |  64 ++---
 .../apache/iotdb/db/utils/sync/SyncPipeUtil.java   |  32 ++-
 .../db/metadata/path/MeasurementPathTest.java      |  46 ++++
 .../org/apache/iotdb/db/sync/SyncTestUtils.java    |   8 +-
 .../sync/pipedata/BufferedPipeDataQueueTest.java   |   4 +-
 .../{SyncInfoTest.java => LocalSyncInfoTest.java}  |  46 ++--
 .../db/sync/receiver/recovery/SyncLogTest.java     |  16 +-
 .../iotdb/db/sync/transport/SyncTransportTest.java |   6 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   2 +
 .../src/main/thrift/confignode.thrift              |  34 ++-
 thrift/src/main/thrift/datanode.thrift             |   2 +
 103 files changed, 1714 insertions(+), 861 deletions(-)

diff --cc 
confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index f7ffe6c0e2,9073554974..5a7158a462
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@@ -55,11 -55,11 +55,14 @@@ import org.apache.iotdb.confignode.cons
  import 
org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetStorageGroupPlan;
  import 
org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTTLPlan;
  import 
org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimePartitionIntervalPlan;
+ import 
org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
+ import 
org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
+ import 
org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
  import 
org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
  import 
org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
 +import 
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
 +import 
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
 +import 
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
  import org.apache.iotdb.consensus.common.request.IConsensusRequest;
  import org.apache.iotdb.db.exception.runtime.SerializationRunTimeException;
  import org.apache.iotdb.tsfile.utils.PublicBAOS;
diff --cc 
confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 2babc8c52f,5e3d1126f0..97aefa9cb2
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@@ -79,14 -80,15 +81,17 @@@ import org.apache.iotdb.confignode.pers
  import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
  import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
  import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
+ import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
  import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
  import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
 +import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
  import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
  import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 +import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
  import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
  import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
+ import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
+ import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
  import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
  import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
  import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
@@@ -149,8 -151,8 +154,10 @@@ public class ConfigManager implements I
    /** UDF */
    private final UDFManager udfManager;
  
 +  /** Manage Trigger */
 +  private final TriggerManager triggerManager;
+   /** Sync */
+   private final SyncManager syncManager;
  
    public ConfigManager() throws IOException {
      // Build the persistence module
@@@ -160,7 -162,7 +167,8 @@@
      AuthorInfo authorInfo = new AuthorInfo();
      ProcedureInfo procedureInfo = new ProcedureInfo();
      UDFInfo udfInfo = new UDFInfo();
 +    TriggerInfo triggerInfo = new TriggerInfo();
+     ClusterSyncInfo syncInfo = new ClusterSyncInfo();
  
      // Build state machine and executor
      ConfigPlanExecutor executor =
@@@ -171,7 -173,7 +179,8 @@@
              authorInfo,
              procedureInfo,
              udfInfo,
-             triggerInfo);
++            triggerInfo,
+             syncInfo);
      PartitionRegionStateMachine stateMachine = new 
PartitionRegionStateMachine(this, executor);
  
      // Build the manager module
@@@ -181,8 -183,8 +190,9 @@@
      this.permissionManager = new PermissionManager(this, authorInfo);
      this.procedureManager = new ProcedureManager(this, procedureInfo);
      this.udfManager = new UDFManager(this, udfInfo);
 +    this.triggerManager = new TriggerManager(this, triggerInfo);
      this.loadManager = new LoadManager(this);
+     this.syncManager = new SyncManager(this, syncInfo);
  
      // ConsensusManager must be initialized last, as it would load states 
from disk and reinitialize
      // above managers
diff --cc 
confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 40192d6329,38908a89a7..1f4c7fc55e
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@@ -43,12 -45,12 +45,14 @@@ import org.apache.iotdb.confignode.mana
  import org.apache.iotdb.confignode.manager.partition.PartitionManager;
  import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
  import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
 +import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
  import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
  import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 +import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
  import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
  import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
+ import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
+ import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
  import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
  import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
  import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
diff --cc 
confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
index e30364410d,0000000000..9786a058d5
mode 100644,000000..100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
@@@ -1,153 -1,0 +1,147 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.confignode.persistence;
 +
 +import org.apache.iotdb.common.rpc.thrift.TSStatus;
- import org.apache.iotdb.commons.exception.StartupException;
 +import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 +import org.apache.iotdb.commons.trigger.TriggerInformation;
 +import org.apache.iotdb.commons.trigger.TriggerTable;
 +import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
 +import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 +import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 +import 
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
 +import 
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
 +import 
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
 +import org.apache.iotdb.rpc.TSStatusCode;
 +
 +import org.apache.thrift.TException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.concurrent.locks.ReentrantLock;
 +
 +public class TriggerInfo implements SnapshotProcessor {
 +  private static final Logger LOGGER = 
LoggerFactory.getLogger(TriggerInfo.class);
 +
 +  private static final ConfigNodeConfig CONFIG_NODE_CONF =
 +      ConfigNodeDescriptor.getInstance().getConf();
 +
 +  private final TriggerTable triggerTable;
 +  private final Map<String, String> existedJarToMD5;
 +  // private final Map<String, AtomicInteger> jarReferenceTable;
 +
 +  private final TriggerExecutableManager triggerExecutableManager;
 +
-   private ReentrantLock triggerTableLock = new ReentrantLock();
++  private final ReentrantLock triggerTableLock = new ReentrantLock();
 +
-   public TriggerInfo() {
++  public TriggerInfo() throws IOException {
 +    triggerTable = new TriggerTable();
 +    existedJarToMD5 = new HashMap<>();
 +    // jarReferenceTable = new ConcurrentHashMap<>();
 +    triggerExecutableManager =
 +        TriggerExecutableManager.setupAndGetInstance(
 +            CONFIG_NODE_CONF.getTemporaryLibDir(), 
CONFIG_NODE_CONF.getTriggerLibDir());
-     try {
-       triggerExecutableManager.start();
-     } catch (StartupException e) {
-       throw new RuntimeException(e);
-     }
 +  }
 +
 +  public void acquireTriggerTableLock() {
 +    triggerTableLock.lock();
 +  }
 +
 +  public void releaseTriggerTableLock() {
 +    triggerTableLock.unlock();
 +  }
 +
 +  @Override
 +  public boolean processTakeSnapshot(File snapshotDir) throws TException, 
IOException {
 +    return false;
 +  }
 +
 +  @Override
 +  public void processLoadSnapshot(File snapshotDir) throws TException, 
IOException {}
 +
 +  /**
 +   * Validate whether the trigger can be created
 +   *
 +   * @param triggerName
 +   * @param jarName
 +   * @param jarMD5
 +   * @return whether the JarFile in this request need to be saved
 +   */
 +  public boolean validate(String triggerName, String jarName, String jarMD5) {
 +    if (triggerTable.containsTrigger(triggerName)) {
 +      throw new TriggerManagementException(
 +          String.format(
 +              "Failed to create trigger [%s], the same name trigger has been 
created",
 +              triggerName));
 +    }
 +
 +    if (existedJarToMD5.containsKey(jarName)) {
 +      if (existedJarToMD5.get(jarName).equals(jarMD5)) {
 +        return false;
 +      } else {
 +        throw new TriggerManagementException(
 +            String.format(
 +                "Failed to create trigger [%s], the same name Jar [%s] but 
different MD5 [%s] has existed",
 +                triggerName, jarName, jarMD5));
 +      }
 +    } else {
 +      return true;
 +    }
 +  }
 +
 +  public TSStatus addTriggerInTable(AddTriggerInTablePlan physicalPlan) {
 +    try {
 +      TriggerInformation triggerInformation = 
physicalPlan.getTriggerInformation();
 +      String triggerName = triggerInformation.getTriggerName();
 +      triggerTable.addTriggerInformation(triggerName, triggerInformation);
 +      existedJarToMD5.put(triggerName, triggerInformation.getJarFileMD5());
 +      if (physicalPlan.getJarFile() != null) {
 +        triggerExecutableManager.writeToLibDir(
 +            ByteBuffer.wrap(physicalPlan.getJarFile().getValues()),
 +            triggerInformation.getJarName());
 +      }
 +      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
 +    } catch (Exception e) {
 +      final String errorMessage =
 +          String.format(
 +              "Failed to create trigger [%s] when addTriggerInTable, because 
of exception: %s",
 +              physicalPlan.getTriggerInformation().getTriggerName(), e);
 +      LOGGER.warn(errorMessage, e);
 +      return new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
 +          .setMessage(errorMessage);
 +    }
 +  }
 +
 +  public TSStatus deleteTriggerInTable(DeleteTriggerInTablePlan physicalPlan) 
{
 +    triggerTable.deleteTriggerInformation(physicalPlan.getTriggerName());
 +    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
 +  }
 +
 +  public TSStatus updateTriggerStateInTable(UpdateTriggerStateInTablePlan 
physicalPlan) {
 +    triggerTable.setTriggerState(physicalPlan.getTriggerName(), 
physicalPlan.getTriggerState());
 +    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
 +  }
 +}
diff --cc 
confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index acffb3a7fa,6f0023c7bf..b39653c310
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@@ -56,11 -56,11 +56,14 @@@ import org.apache.iotdb.confignode.cons
  import 
org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetStorageGroupPlan;
  import 
org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTTLPlan;
  import 
org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimePartitionIntervalPlan;
+ import 
org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
+ import 
org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
+ import 
org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
  import 
org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
  import 
org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
 +import 
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
 +import 
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
 +import 
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
  import 
org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
  import 
org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
  import org.apache.iotdb.confignode.persistence.AuthorInfo;
@@@ -105,7 -105,7 +109,8 @@@ public class ConfigPlanExecutor 
  
    private final UDFInfo udfInfo;
  
 +  private final TriggerInfo triggerInfo;
+   private final ClusterSyncInfo syncInfo;
  
    public ConfigPlanExecutor(
        NodeInfo nodeInfo,
@@@ -114,14 -114,14 +119,16 @@@
        AuthorInfo authorInfo,
        ProcedureInfo procedureInfo,
        UDFInfo udfInfo,
-       TriggerInfo triggerInfo) {
++      TriggerInfo triggerInfo,
+       ClusterSyncInfo syncInfo) {
      this.nodeInfo = nodeInfo;
      this.clusterSchemaInfo = clusterSchemaInfo;
      this.partitionInfo = partitionInfo;
      this.authorInfo = authorInfo;
      this.procedureInfo = procedureInfo;
      this.udfInfo = udfInfo;
 +    this.triggerInfo = triggerInfo;
+     this.syncInfo = syncInfo;
    }
  
    public DataSet executeQueryPlan(ConfigPhysicalPlan req)
diff --cc 
node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
index 447bd8994e,533020680f..5f7f41983e
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
@@@ -29,9 -29,11 +29,12 @@@ import org.apache.iotdb.tsfile.write.sc
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
+ import java.io.ByteArrayOutputStream;
+ import java.io.DataOutputStream;
  import java.io.IOException;
 +import java.io.OutputStream;
  import java.nio.ByteBuffer;
+ import java.nio.charset.StandardCharsets;
  
  public class MeasurementPath extends PartialPath {
  
diff --cc 
node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
index 375f7e31f8,cee7bb6b4f..ae9c20a9ff
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
@@@ -33,12 -30,8 +33,8 @@@ import java.util.stream.Collectors
  public class TriggerTable {
    private final Map<String, TriggerInformation> triggerTable;
  
-   // todo: Maintain a PatternTree: PathPattern -> List<String> triggerNames
-   // Given a PathPattern, return the triggerNames of triggers whose 
PathPatterns match the given
-   // one.
- 
    public TriggerTable() {
 -    triggerTable = new HashMap<>();
 +    triggerTable = new ConcurrentHashMap<>();
    }
  
    public TriggerTable(Map<String, TriggerInformation> triggerTable) {
diff --cc 
server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
index ae81a5d499,319b8bc309..f1a6b5194f
--- 
a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
@@@ -22,14 -22,8 +22,9 @@@ package org.apache.iotdb.db.trigger.ser
  import org.apache.iotdb.commons.trigger.TriggerInformation;
  import org.apache.iotdb.commons.trigger.TriggerTable;
  import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
- import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
- import org.apache.iotdb.commons.trigger.service.TriggerClassLoaderManager;
  import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
  import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
 +import org.apache.iotdb.db.conf.IoTDBConfig;
  import org.apache.iotdb.db.conf.IoTDBDescriptor;
  import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
  import org.apache.iotdb.trigger.api.Trigger;
@@@ -80,11 -72,6 +75,11 @@@ public class TriggerManagementService 
        acquireLock();
        checkIfRegistered(triggerInformation);
        doRegister(triggerInformation);
 +    } catch (Exception e) {
 +      LOGGER.warn(
-           "Failed to register trigger({}) on data node, the cause is: {}",
++          "Failed to register trigger({}) on data node, the cause is ",
 +          triggerInformation.getTriggerName(),
-           e.getMessage());
++          e);
      } finally {
        releaseLock();
      }
@@@ -94,11 -81,6 +89,8 @@@
      try {
        acquireLock();
        triggerTable.setTriggerState(triggerName, TTriggerState.ACTIVE);
 +    } catch (Exception e) {
-       LOGGER.warn(
-           "Failed to active trigger({}) on data node, the cause is: {}",
-           triggerName,
-           e.getMessage());
++      LOGGER.warn("Failed to active trigger({}) on data node, the cause is ", 
triggerName, e);
      } finally {
        releaseLock();
      }
@@@ -108,11 -90,6 +100,8 @@@
      try {
        acquireLock();
        triggerTable.setTriggerState(triggerName, TTriggerState.INACTIVE);
 +    } catch (Exception e) {
-       LOGGER.warn(
-           "Failed to active trigger({}) on data node, the cause is: {}",
-           triggerName,
-           e.getMessage());
++      LOGGER.warn("Failed to active trigger({}) on data node, the cause is: 
", triggerName, e);
      } finally {
        releaseLock();
      }
@@@ -158,8 -134,8 +146,8 @@@
                      "Failed to registered trigger %s, "
                          + "because error occurred when trying to compute md5 
of jar file for trigger %s ",
                      triggerName, triggerName);
--            LOGGER.warn(errorMessage);
 -            throw e;
++            LOGGER.warn(errorMessage, e);
 +            throw new TriggerManagementException(errorMessage);
            }
          }
  
@@@ -219,20 -193,7 +205,6 @@@
                "Failed to reflect trigger instance with className(%s), because 
%s", className, e));
      }
    }
- 
-   @Override
-   public void start() throws StartupException {}
- 
-   @Override
-   public void stop() {
-     // nothing to do
-   }
- 
-   @Override
-   public ServiceType getID() {
-     return ServiceType.TRIGGER_REGISTRATION_SERVICE;
-   }
--
    
/////////////////////////////////////////////////////////////////////////////////////////////////
    // singleton instance holder
    
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --cc thrift/src/main/thrift/datanode.thrift
index 5e285c4996,12d3a41ddb..60cb2a2d43
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@@ -434,18 -430,11 +434,20 @@@ service IDataNodeRPCService 
     **/
    common.TSStatus activeTriggerInstance(TActiveTriggerInstanceReq req)
  
++
 +  /**
 +   * Config node will inactive a trigger instance on data node.
 +   *
 +   * @param trigger name.
 +   **/
 +  common.TSStatus inactiveTriggerInstance(TInactiveTriggerInstanceReq req)
 +
++
    /**
 -    * Config node will drop a trigger on all online config nodes and data 
nodes.
 -    *
 -    * @param trigger name, whether need to delete jar
 -    **/
 +   * Config node will drop a trigger on all online config nodes and data 
nodes.
 +   *
 +   * @param trigger name, whether need to delete jar
 +   **/
    common.TSStatus dropTriggerInstance(TDropTriggerInstanceReq req)
  
    /**

Reply via email to