This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9ee5295fe61a69d702b381c5febe6436a3e14a20
Merge: 58289d6d0a d0d2ec30a8
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Oct 20 14:02:01 2022 +0800

    resolve conflicts

 .github/workflows/standalone-it-for-mpp.yml        |  22 +-
 .gitignore                                         |   1 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  20 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   1 +
 .../main/java/org/apache/iotdb/tool/ImportCsv.java |  20 +-
 .../resources/conf/iotdb-confignode.properties     |  13 +-
 .../confignode/client/DataNodeRequestType.java     |   1 +
 .../client/async/AsyncDataNodeClientPool.java      |   7 +
 .../client/async/handlers/AsyncClientHandler.java  |   1 +
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  50 +++
 .../confignode/conf/ConfigNodeDescriptor.java      |  32 +-
 .../confignode/conf/ConfigNodeRemoveCheck.java     |   4 +-
 .../confignode/conf/SystemPropertiesUtils.java     |   2 +-
 .../consensus/request/ConfigPhysicalPlan.java      |  25 +-
 .../consensus/request/ConfigPhysicalPlanType.java  |   7 +-
 .../{GetRoutingPlan.java => GetRegionIdPlan.java}  |  10 +-
 ...ePlan.java => GetTransferringTriggersPlan.java} |   8 +-
 ...rTablePlan.java => GetTriggerLocationPlan.java} |  30 +-
 .../request/read/GetTriggerTablePlan.java          |  22 +-
 .../sync/DropPipePlan.java}                        |  28 +-
 .../write/trigger/UpdateTriggerLocationPlan.java   |  76 ++++
 .../trigger/UpdateTriggersOnTransferNodesPlan.java |  75 ++++
 .../{GetRoutingResp.java => GetRegionIdResp.java}  |  10 +-
 .../response/TransferringTriggersResp.java         |  26 +-
 ...etRoutingResp.java => TriggerLocationResp.java} |  30 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  61 ++-
 .../iotdb/confignode/manager/ConsensusManager.java |  12 +-
 .../apache/iotdb/confignode/manager/IManager.java  |  13 +-
 .../iotdb/confignode/manager/SyncManager.java      |   8 +-
 .../iotdb/confignode/manager/TriggerManager.java   | 105 ++++-
 .../iotdb/confignode/manager/load/LoadManager.java |   6 +-
 .../manager/load/balancer/RouteBalancer.java       |   6 +-
 .../manager/load/balancer/router/IRouter.java      |   2 +-
 .../load/balancer/router/LazyGreedyRouter.java     |   2 +-
 .../manager/load/balancer/router/LeaderRouter.java |   2 +-
 .../balancer/router/LoadScoreGreedyRouter.java     |   2 +-
 .../iotdb/confignode/manager/node/NodeManager.java |  43 ++-
 .../manager/partition/PartitionManager.java        |   8 +-
 .../iotdb/confignode/persistence/TriggerInfo.java  |  67 +++-
 .../persistence/executor/ConfigPlanExecutor.java   |  27 +-
 .../persistence/partition/PartitionInfo.java       |  12 +-
 .../partition/StorageGroupPartitionTable.java      |   6 +-
 .../persistence/sync/ClusterSyncInfo.java          |  43 +--
 .../confignode/procedure/ProcedureExecutor.java    |   2 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |   3 -
 .../procedure/impl/CreateTriggerProcedure.java     |   5 +-
 .../impl/sync/AbstractOperatePipeProcedure.java    |   6 +-
 .../procedure/impl/sync/CreatePipeProcedure.java   |   3 +-
 .../procedure/impl/sync/DropPipeProcedure.java     |  12 +-
 .../iotdb/confignode/service/ConfigNode.java       |   7 +-
 .../confignode/service/ConfigNodeCommandLine.java  |   5 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  26 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |  98 ++++-
 .../load/balancer/router/LazyGreedyRouterTest.java |   8 +-
 .../load/balancer/router/LeaderRouterTest.java     |   6 +-
 .../balancer/router/LoadScoreGreedyRouterTest.java |   2 +-
 .../confignode1conf/iotdb-confignode.properties    |   3 +-
 .../confignode2conf/iotdb-confignode.properties    |   3 +-
 .../confignode3conf/iotdb-confignode.properties    |   1 +
 consensus/pom.xml                                  |   2 +-
 .../apache/iotdb/consensus/config/RatisConfig.java |  55 ++-
 .../multileader/logdispatcher/LogDispatcher.java   |  18 +-
 .../multileader/logdispatcher/PendingBatch.java    |  11 +-
 .../ratis/FileInfoWithDelayedMd5Computing.java     |  75 ----
 .../iotdb/consensus/ratis/RatisConsensus.java      |   7 +-
 .../iotdb/consensus/ratis/SnapshotStorage.java     |   2 +-
 .../org/apache/iotdb/consensus/ratis/Utils.java    |   8 +
 .../multileader/logdispatcher/SyncStatusTest.java  |  25 +-
 .../apache/iotdb/consensus/ratis/SnapshotTest.java |   3 +
 docs/UserGuide/Cluster/Cluster-Concept.md          |  18 +-
 docs/UserGuide/Cluster/Cluster-Setup.md            |  12 +-
 docs/UserGuide/Delete-Data/TTL.md                  |  16 +-
 .../Maintenance-Tools/Maintenance-Command.md       | 100 ++++-
 docs/zh/UserGuide/Cluster/Cluster-Setup.md         |  12 +-
 docs/zh/UserGuide/Delete-Data/TTL.md               |  18 +-
 .../Maintenance-Tools/Maintenance-Command.md       |  99 ++++-
 .../iotdb/trigger/ClusterAlertingExample.java      |   8 +-
 .../org/apache/iotdb/trigger/LoggerTrigger.java    |  86 +++++
 integration-test/import-control.xml                |   2 +-
 .../trigger/example/TriggerFireTimesCounter.java   |  87 +++++
 .../java/org/apache/iotdb/it/env/MppConfig.java    |  11 +-
 .../org/apache/iotdb/it/utils/TsFileGenerator.java | 232 +++++++++++
 .../org/apache/iotdb/itbase/env/BaseConfig.java    |   8 +-
 .../confignode/it/IoTDBClusterPartitionIT.java     |  63 +--
 .../confignode/it/IoTDBConfigNodeSnapshotIT.java   |   4 +-
 .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java  | 340 +++++++----------
 .../java/org/apache/iotdb/db/it/IoTDBFilterIT.java |  24 +-
 .../org/apache/iotdb/db/it/sync/IoTDBPipeIT.java   |  69 +++-
 .../apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java   |   7 +-
 .../db/it/trigger/IoTDBTriggerExecutionIT.java     | 265 +++++++++++++
 .../iotdb/session/it/IoTDBSessionInsertNulIT.java  |   2 +-
 .../IoTDBSessionInsertWithTriggerExecutionIT.java  | 337 ++++++++++++++++
 .../src/test/resources/TriggerFireTimesCounter.jar | Bin 0 -> 1324 bytes
 ...IoTDBLoadExternalTsFileWithTimePartitionIT.java | 329 ----------------
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   5 +
 ...ception.java => PipeAlreadyExistException.java} |  12 +-
 .../exception/sync/PipeNotExistException.java      |   4 -
 .../sync/PipeSinkAlreadyExistException.java}       |  16 +-
 ...eption.java => PipeSinkBeingUsedException.java} |  12 +-
 .../sync/PipeSinkNotExistException.java}           |  16 +-
 .../commons/executable/ExecutableManager.java      |  11 +
 .../commons/partition/DataPartitionTable.java      |   4 +-
 .../commons/partition/SchemaPartitionTable.java    |   2 +-
 .../commons/partition/SeriesPartitionTable.java    |   2 +-
 .../org/apache/iotdb/commons/path/PartialPath.java |  13 +
 .../iotdb/commons/sync/metadata/SyncMetadata.java  | 122 +++---
 .../commons/sync/persistence/SyncLogReader.java    |  31 +-
 .../apache/iotdb/commons/sync/pipe/PipeStatus.java |   1 -
 .../iotdb/commons/trigger/TriggerInformation.java  |  15 +
 .../apache/iotdb/commons/trigger/TriggerTable.java |  45 +++
 .../commons/sync/metedata/SyncMetadataTest.java    | 189 +++++++++
 .../resources/conf/iotdb-datanode.properties       |  24 ++
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  43 ++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  53 +++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  17 +
 .../db/consensus/DataRegionConsensusImpl.java      |  10 +
 .../db/consensus/SchemaRegionConsensusImpl.java    |  10 +
 .../impl/ReadChunkCompactionPerformer.java         |  21 +-
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  10 +-
 .../iotdb/db/engine/load/AlignedChunkData.java     |   9 +-
 .../org/apache/iotdb/db/engine/load/ChunkData.java |  14 +-
 .../apache/iotdb/db/engine/load/DeletionData.java  |  72 ++++
 .../iotdb/db/engine/load/LoadTsFileManager.java    |  29 +-
 .../iotdb/db/engine/load/NonAlignedChunkData.java  |   1 +
 .../load/{ChunkData.java => TsFileData.java}       |  42 +-
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  11 +-
 .../db/engine/storagegroup/TsFileManager.java      |   5 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |   6 +-
 .../storagegroup/timeindex/DeviceTimeIndex.java    |   5 +-
 .../db/mpp/common/header/ColumnHeaderConstant.java |  37 +-
 .../db/mpp/common/header/DatasetHeaderFactory.java |  10 +-
 .../execution/exchange/MPPDataExchangeService.java |   8 +-
 .../execution/executor/RegionWriteExecutor.java    |  36 +-
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  29 ++
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 128 ++++++-
 .../iotdb/db/mpp/plan/analyze/SelectIntoUtils.java | 105 +++++
 .../db/mpp/plan/execution/QueryExecution.java      |  17 +-
 .../plan/execution/config/ConfigTaskVisitor.java   |   9 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  30 +-
 .../config/executor/IConfigTaskExecutor.java       |   4 +-
 .../executor/StandaloneConfigTaskExecutor.java     |   4 +-
 .../{GetRegionTask.java => GetRegionIdTask.java}   |  22 +-
 .../mpp/plan/expression/leaf/ConstantOperand.java  |   4 +-
 .../plan/expression/leaf/TimeSeriesOperand.java    |   9 +
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       | 111 +++++-
 .../plan/node/load/LoadSingleTsFileNode.java       |  69 +++-
 .../planner/plan/node/load/LoadTsFileNode.java     |   3 +
 .../plan/node/load/LoadTsFilePieceNode.java        |  35 +-
 .../parameter/DeviceViewIntoPathDescriptor.java    | 208 ++++++++++
 .../planner/plan/parameter/IntoPathDescriptor.java | 167 ++++++++
 .../plan/scheduler/load/LoadTsFileScheduler.java   |   7 +
 .../db/mpp/plan/statement/StatementVisitor.java    |   6 +-
 .../plan/statement/component/IntoComponent.java    | 237 ++++++++++++
 .../db/mpp/plan/statement/component/IntoItem.java  |  73 ++++
 .../plan/statement/crud/LoadTsFileStatement.java   |   4 +-
 .../db/mpp/plan/statement/crud/QueryStatement.java |  38 ++
 ...ionStatement.java => GetRegionIdStatement.java} |   6 +-
 .../dag/input/ConstantInputReader.java             |   3 +-
 .../transformation/dag/util/TransformUtils.java    |   3 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  34 +-
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |   7 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |   4 -
 .../impl/DataNodeInternalRPCServiceImpl.java       |  52 +++
 .../java/org/apache/iotdb/db/sync/SyncService.java | 170 ++++-----
 .../db/sync/common/ClusterSyncInfoFetcher.java     |   7 +-
 .../iotdb/db/sync/common/ISyncInfoFetcher.java     |   4 +-
 .../apache/iotdb/db/sync/common/LocalSyncInfo.java |  22 +-
 .../iotdb/db/sync/common/LocalSyncInfoFetcher.java |  11 +-
 .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java |   7 +
 .../iotdb/db/sync/sender/pipe/TsFilePipe.java      |  49 ++-
 .../db/sync/transport/client/SenderManager.java    |   2 +
 .../iotdb/db/trigger/executor/TriggerExecutor.java |  41 +-
 .../db/trigger/executor/TriggerFireResult.java     |  57 +++
 .../db/trigger/executor/TriggerFireVisitor.java    | 423 +++++++++++++++++++++
 .../trigger/service/TriggerManagementService.java  |  85 +++++
 .../apache/iotdb/db/utils/sync/SyncPipeUtil.java   |   2 +-
 .../ReadChunkCompactionPerformerAlignedTest.java   |  79 ++++
 .../db/engine/storagegroup/DataRegionTest.java     |  40 ++
 .../db/mpp/common/schematree/NodeRefTest.java      |  47 +++
 .../iotdb/db/mpp/plan/analyze/AnalyzeTest.java     | 169 ++++++++
 .../manager => persistence}/LocalSyncInfoTest.java |  23 +-
 .../recovery => persistence}/SyncLogTest.java      |  36 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   2 +
 .../src/main/thrift/confignode.thrift              |  71 ++--
 thrift/src/main/thrift/datanode.thrift             |  30 ++
 .../tsfile/common/constant/TsFileConstant.java     |   4 +
 .../tsfile/write/chunk/AlignedChunkWriterImpl.java |  12 +
 .../apache/iotdb/tsfile/write/record/Tablet.java   | 341 ++++++++++++++++-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |   4 +
 .../iotdb/tsfile/write/record/TabletTest.java      |  65 ++++
 190 files changed, 6184 insertions(+), 1511 deletions(-)

diff --cc 
confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index bc7f9f6ce0,f2a57d462a..488cb0298d
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@@ -475,38 -473,38 +475,68 @@@ public class ConfigNodeDescriptor 
              properties.getProperty(
                  "schema_region_ratis_max_sleep_time_ms",
                  String.valueOf(conf.getSchemaRegionRatisMaxSleepTimeMs()))));
+ 
+     conf.setPartitionRegionRatisPreserveLogsWhenPurge(
+         Long.parseLong(
+             properties.getProperty(
+                 "partition_region_ratis_preserve_logs_num_when_purge",
+                 
String.valueOf(conf.getPartitionRegionRatisPreserveLogsWhenPurge()))));
+ 
+     conf.setSchemaRegionRatisPreserveLogsWhenPurge(
+         Long.parseLong(
+             properties.getProperty(
+                 "schema_region_ratis_preserve_logs_num_when_purge",
+                 
String.valueOf(conf.getSchemaRegionRatisPreserveLogsWhenPurge()))));
+ 
+     conf.setDataRegionRatisPreserveLogsWhenPurge(
+         Long.parseLong(
+             properties.getProperty(
+                 "data_region_ratis_preserve_logs_num_when_purge",
+                 
String.valueOf(conf.getDataRegionRatisPreserveLogsWhenPurge()))));
+ 
+     conf.setRatisFirstElectionTimeoutMinMs(
+         Long.parseLong(
+             properties.getProperty(
+                 "ratis_first_election_timeout_min_ms",
+                 String.valueOf(conf.getRatisFirstElectionTimeoutMinMs()))));
+ 
+     conf.setRatisFirstElectionTimeoutMaxMs(
+         Long.parseLong(
+             properties.getProperty(
+                 "ratis_first_election_timeout_max_ms",
+                 String.valueOf(conf.getRatisFirstElectionTimeoutMaxMs()))));
    }
  
 +  private void loadCQConfig(Properties properties) {
 +    int cqSubmitThread =
 +        Integer.parseInt(
 +            properties.getProperty(
 +                "continuous_query_submit_thread", 
String.valueOf(conf.getCqSubmitThread())));
 +    if (cqSubmitThread <= 0) {
 +      LOGGER.warn(
 +          "continuous_query_submit_thread should be greater than 0, but 
current value is {}, ignore that and use the default value {}",
 +          cqSubmitThread,
 +          conf.getCqSubmitThread());
 +      cqSubmitThread = conf.getCqSubmitThread();
 +    }
 +    conf.setCqSubmitThread(cqSubmitThread);
 +
 +    long cqMinEveryIntervalInMs =
 +        Long.parseLong(
 +            properties.getProperty(
 +                "continuous_query_min_every_interval_in_ms",
 +                String.valueOf(conf.getCqMinEveryIntervalInMs())));
 +    if (cqMinEveryIntervalInMs <= 0) {
 +      LOGGER.warn(
 +          "continuous_query_min_every_interval_in_ms should be greater than 
0, but current value is {}, ignore that and use the default value {}",
 +          cqMinEveryIntervalInMs,
 +          conf.getCqMinEveryIntervalInMs());
 +      cqMinEveryIntervalInMs = conf.getCqMinEveryIntervalInMs();
 +    }
 +
 +    conf.setCqMinEveryIntervalInMs(cqMinEveryIntervalInMs);
 +  }
 +
    /**
     * Check if the current ConfigNode is SeedConfigNode.
     *
diff --cc 
confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index bf5a438f47,c38ec65dae..b7226dd199
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@@ -304,20 -310,14 +315,28 @@@ public abstract class ConfigPhysicalPla
          case GetSeriesSlotList:
            req = new GetSeriesSlotListPlan();
            break;
+         case UpdateTriggersOnTransferNodes:
+           req = new UpdateTriggersOnTransferNodesPlan();
+           break;
+         case UpdateTriggerLocation:
+           req = new UpdateTriggerLocationPlan();
+           break;
+         case GetTransferringTriggers:
+           req = new GetTransferringTriggersPlan();
 +        case ACTIVE_CQ:
 +          req = new ActiveCQPlan();
 +          break;
 +        case ADD_CQ:
 +          req = new AddCQPlan();
 +          break;
 +        case DROP_CQ:
 +          req = new DropCQPlan();
 +          break;
 +        case UPDATE_CQ_LAST_EXEC_TIME:
 +          req = new UpdateCQLastExecTimePlan();
 +          break;
 +        case SHOW_CQ:
 +          req = new ShowCQPlan();
            break;
          default:
            throw new IOException("unknown PhysicalPlan type: " + typeNum);
diff --cc 
confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index be8fad06e7,f2492a1766..8bcb95c125
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@@ -89,12 -90,11 +90,16 @@@ public enum ConfigPhysicalPlanType 
    GetTriggerTable,
    UpdateTriggerStateInTable,
    GetTriggerJar,
-   GetRouting,
+   GetRegionId,
    GetSeriesSlotList,
    GetTimeSlotList,
+   UpdateTriggersOnTransferNodes,
+   UpdateTriggerLocation,
+   GetTransferringTriggers,
 -  GetTriggerLocation
++  GetTriggerLocation,
 +  DROP_CQ,
 +  ACTIVE_CQ,
 +  ADD_CQ,
 +  UPDATE_CQ_LAST_EXEC_TIME,
 +  SHOW_CQ
  }
diff --cc 
confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index eb7973a871,1f76144618..e3625bd3dd
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@@ -94,9 -92,9 +95,10 @@@ import org.apache.iotdb.confignode.rpc.
  import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
  import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
  import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 +import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
  import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
  import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
+ import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
  import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
  import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
  import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
diff --cc 
confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 3d7e055e41,a0f3ce3041..5007536503
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@@ -55,9 -53,9 +55,10 @@@ import org.apache.iotdb.confignode.rpc.
  import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
  import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
  import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 +import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
  import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
  import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
+ import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
  import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
  import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
  import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
diff --cc 
confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
index 7fc4b3ba8b,e56e4475e1..a30f834235
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
@@@ -46,7 -59,7 +59,8 @@@ import org.slf4j.LoggerFactory
  import java.io.IOException;
  import java.util.Collections;
  import java.util.List;
+ import java.util.Map;
 +import java.util.Optional;
  
  public class TriggerManager {
    private static final Logger LOGGER = 
LoggerFactory.getLogger(TriggerManager.class);
diff --cc 
confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 2ef6251d36,7162762a87..875017c8f2
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@@ -829,16 -831,35 +842,40 @@@ public class NodeManager 
            }
          });
  
 -    LOGGER.info(
 -        "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", 
result, lowestLoadScore);
 -    return 
configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
 +    if (result.get() == -1) {
 +      return Optional.empty();
 +    } else {
 +      LOGGER.info(
 +          "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", 
result, lowestLoadScore);
 +      return Optional.of(
 +          
configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get()));
 +    }
    }
  
+   /**
+    * Get the DataNodeLocation of the lowest load DataNode in input
+    *
+    * @return TDataNodeLocation
+    */
+   public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) {
+     AtomicInteger result = new AtomicInteger();
+     AtomicLong lowestLoadScore = new AtomicLong(Long.MAX_VALUE);
+ 
+     nodes.forEach(
+         nodeID -> {
+           BaseNodeCache cache = nodeCacheMap.get(nodeID);
+           long score = (cache == null) ? Long.MAX_VALUE : 
cache.getLoadScore();
+           if (score < lowestLoadScore.get()) {
+             result.set(nodeID);
+             lowestLoadScore.set(score);
+           }
+         });
+ 
+     LOGGER.info(
+         "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", 
result, lowestLoadScore);
+     return 
configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
+   }
+ 
    public boolean isNodeRemoving(int dataNodeId) {
      DataNodeHeartbeatCache cache =
          (DataNodeHeartbeatCache) 
configManager.getNodeManager().getNodeCacheMap().get(dataNodeId);
diff --cc 
confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index fd6b13d486,e5815cd695..7af57e0ff8
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@@ -291,15 -294,9 +306,17 @@@ public class ConfigPlanExecutor 
        case PreCreatePipe:
          return syncInfo.preCreatePipe((PreCreatePipePlan) physicalPlan);
        case SetPipeStatus:
-         return syncInfo.operatePipe((SetPipeStatusPlan) physicalPlan);
+         return syncInfo.setPipeStatus((SetPipeStatusPlan) physicalPlan);
+       case DropPipe:
+         return syncInfo.dropPipe((DropPipePlan) physicalPlan);
 +      case ADD_CQ:
 +        return cqInfo.addCQ((AddCQPlan) physicalPlan);
 +      case DROP_CQ:
 +        return cqInfo.dropCQ((DropCQPlan) physicalPlan);
 +      case ACTIVE_CQ:
 +        return cqInfo.activeCQ((ActiveCQPlan) physicalPlan);
 +      case UPDATE_CQ_LAST_EXEC_TIME:
 +        return cqInfo.updateCQLastExecutionTime((UpdateCQLastExecTimePlan) 
physicalPlan);
        default:
          throw new UnknownPhysicalPlanTypeException(physicalPlan.getType());
      }
diff --cc server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6913df7cb0,c17ea193c3..20c624583f
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@@ -1964,12 -1974,14 +1975,18 @@@ public class IoTDBDescriptor 
      
conf.setSchemaRatisConsensusMaxRetryAttempts(ratisConfig.getSchemaMaxRetryAttempts());
      
conf.setSchemaRatisConsensusInitialSleepTimeMs(ratisConfig.getSchemaInitialSleepTime());
      
conf.setSchemaRatisConsensusMaxSleepTimeMs(ratisConfig.getSchemaMaxSleepTime());
+ 
+     
conf.setDataRatisConsensusPreserveWhenPurge(ratisConfig.getDataPreserveWhenPurge());
+     
conf.setSchemaRatisConsensusPreserveWhenPurge(ratisConfig.getSchemaPreserveWhenPurge());
+ 
+     
conf.setRatisFirstElectionTimeoutMinMs(ratisConfig.getFirstElectionTimeoutMin());
+     
conf.setRatisFirstElectionTimeoutMaxMs(ratisConfig.getFirstElectionTimeoutMax());
    }
  
 +  public void loadCQConfig(TCQConfig cqConfig) {
 +    conf.setCqMinEveryIntervalInMs(cqConfig.getCqMinEveryIntervalInMs());
 +  }
 +
    public void reclaimConsensusMemory() {
      conf.setAllocateMemoryForStorageEngine(
          conf.getAllocateMemoryForStorageEngine() + 
conf.getAllocateMemoryForConsensus());
diff --cc 
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 690070bff6,30a2eeb644..d014d1052a
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@@ -157,11 -140,13 +161,15 @@@ import org.apache.iotdb.mpp.rpc.thrift.
  import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
  import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
  import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
+ import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
  import org.apache.iotdb.rpc.RpcUtils;
  import org.apache.iotdb.rpc.TSStatusCode;
+ import org.apache.iotdb.trigger.api.enums.FailureStrategy;
+ import org.apache.iotdb.trigger.api.enums.TriggerEvent;
  import org.apache.iotdb.tsfile.exception.NotImplementedException;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 +import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+ import org.apache.iotdb.tsfile.write.record.Tablet;
  
  import com.google.common.collect.ImmutableList;
  import org.apache.thrift.TException;
diff --cc thrift-confignode/src/main/thrift/confignode.thrift
index c5c5c5f0ac,4af568e3e0..9069d03c82
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@@ -81,12 -80,14 +81,18 @@@ struct TRatisConfig 
    20: required i64 dataInitialSleepTime
    21: required i64 schemaMaxSleepTime
    22: required i64 dataMaxSleepTime
+ 
+   23: required i64 schemaPreserveWhenPurge
+   24: required i64 dataPreserveWhenPurge
+ 
+   25: required i64 firstElectionTimeoutMin
+   26: required i64 firstElectionTimeoutMax
  }
  
 +struct TCQConfig {
 +  1: required i64 cqMinEveryIntervalInMs
 +}
 +
  struct TDataNodeRemoveReq {
    1: required list<common.TDataNodeLocation> dataNodeLocations
  }

Reply via email to