This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch IOTDB-4619 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9ee5295fe61a69d702b381c5febe6436a3e14a20 Merge: 58289d6d0a d0d2ec30a8 Author: JackieTien97 <[email protected]> AuthorDate: Thu Oct 20 14:02:01 2022 +0800 resolve conflicts .github/workflows/standalone-it-for-mpp.yml | 22 +- .gitignore | 1 + .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 20 +- .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 1 + .../main/java/org/apache/iotdb/tool/ImportCsv.java | 20 +- .../resources/conf/iotdb-confignode.properties | 13 +- .../confignode/client/DataNodeRequestType.java | 1 + .../client/async/AsyncDataNodeClientPool.java | 7 + .../client/async/handlers/AsyncClientHandler.java | 1 + .../iotdb/confignode/conf/ConfigNodeConfig.java | 50 +++ .../confignode/conf/ConfigNodeDescriptor.java | 32 +- .../confignode/conf/ConfigNodeRemoveCheck.java | 4 +- .../confignode/conf/SystemPropertiesUtils.java | 2 +- .../consensus/request/ConfigPhysicalPlan.java | 25 +- .../consensus/request/ConfigPhysicalPlanType.java | 7 +- .../{GetRoutingPlan.java => GetRegionIdPlan.java} | 10 +- ...ePlan.java => GetTransferringTriggersPlan.java} | 8 +- ...rTablePlan.java => GetTriggerLocationPlan.java} | 30 +- .../request/read/GetTriggerTablePlan.java | 22 +- .../sync/DropPipePlan.java} | 28 +- .../write/trigger/UpdateTriggerLocationPlan.java | 76 ++++ .../trigger/UpdateTriggersOnTransferNodesPlan.java | 75 ++++ .../{GetRoutingResp.java => GetRegionIdResp.java} | 10 +- .../response/TransferringTriggersResp.java | 26 +- ...etRoutingResp.java => TriggerLocationResp.java} | 30 +- .../iotdb/confignode/manager/ConfigManager.java | 61 ++- .../iotdb/confignode/manager/ConsensusManager.java | 12 +- .../apache/iotdb/confignode/manager/IManager.java | 13 +- .../iotdb/confignode/manager/SyncManager.java | 8 +- .../iotdb/confignode/manager/TriggerManager.java | 105 ++++- .../iotdb/confignode/manager/load/LoadManager.java | 6 +- .../manager/load/balancer/RouteBalancer.java | 6 +- .../manager/load/balancer/router/IRouter.java | 2 +- .../load/balancer/router/LazyGreedyRouter.java | 2 +- .../manager/load/balancer/router/LeaderRouter.java | 2 +- .../balancer/router/LoadScoreGreedyRouter.java | 2 +- .../iotdb/confignode/manager/node/NodeManager.java | 43 ++- .../manager/partition/PartitionManager.java | 8 +- .../iotdb/confignode/persistence/TriggerInfo.java | 67 +++- .../persistence/executor/ConfigPlanExecutor.java | 27 +- .../persistence/partition/PartitionInfo.java | 12 +- .../partition/StorageGroupPartitionTable.java | 6 +- .../persistence/sync/ClusterSyncInfo.java | 43 +-- .../confignode/procedure/ProcedureExecutor.java | 2 +- .../procedure/env/ConfigNodeProcedureEnv.java | 3 - .../procedure/impl/CreateTriggerProcedure.java | 5 +- .../impl/sync/AbstractOperatePipeProcedure.java | 6 +- .../procedure/impl/sync/CreatePipeProcedure.java | 3 +- .../procedure/impl/sync/DropPipeProcedure.java | 12 +- .../iotdb/confignode/service/ConfigNode.java | 7 +- .../confignode/service/ConfigNodeCommandLine.java | 5 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 26 +- .../request/ConfigPhysicalPlanSerDeTest.java | 98 ++++- .../load/balancer/router/LazyGreedyRouterTest.java | 8 +- .../load/balancer/router/LeaderRouterTest.java | 6 +- .../balancer/router/LoadScoreGreedyRouterTest.java | 2 +- .../confignode1conf/iotdb-confignode.properties | 3 +- .../confignode2conf/iotdb-confignode.properties | 3 +- .../confignode3conf/iotdb-confignode.properties | 1 + consensus/pom.xml | 2 +- .../apache/iotdb/consensus/config/RatisConfig.java | 55 ++- .../multileader/logdispatcher/LogDispatcher.java | 18 +- .../multileader/logdispatcher/PendingBatch.java | 11 +- .../ratis/FileInfoWithDelayedMd5Computing.java | 75 ---- .../iotdb/consensus/ratis/RatisConsensus.java | 7 +- .../iotdb/consensus/ratis/SnapshotStorage.java | 2 +- .../org/apache/iotdb/consensus/ratis/Utils.java | 8 + .../multileader/logdispatcher/SyncStatusTest.java | 25 +- .../apache/iotdb/consensus/ratis/SnapshotTest.java | 3 + docs/UserGuide/Cluster/Cluster-Concept.md | 18 +- docs/UserGuide/Cluster/Cluster-Setup.md | 12 +- docs/UserGuide/Delete-Data/TTL.md | 16 +- .../Maintenance-Tools/Maintenance-Command.md | 100 ++++- docs/zh/UserGuide/Cluster/Cluster-Setup.md | 12 +- docs/zh/UserGuide/Delete-Data/TTL.md | 18 +- .../Maintenance-Tools/Maintenance-Command.md | 99 ++++- .../iotdb/trigger/ClusterAlertingExample.java | 8 +- .../org/apache/iotdb/trigger/LoggerTrigger.java | 86 +++++ integration-test/import-control.xml | 2 +- .../trigger/example/TriggerFireTimesCounter.java | 87 +++++ .../java/org/apache/iotdb/it/env/MppConfig.java | 11 +- .../org/apache/iotdb/it/utils/TsFileGenerator.java | 232 +++++++++++ .../org/apache/iotdb/itbase/env/BaseConfig.java | 8 +- .../confignode/it/IoTDBClusterPartitionIT.java | 63 +-- .../confignode/it/IoTDBConfigNodeSnapshotIT.java | 4 +- .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java | 340 +++++++---------- .../java/org/apache/iotdb/db/it/IoTDBFilterIT.java | 24 +- .../org/apache/iotdb/db/it/sync/IoTDBPipeIT.java | 69 +++- .../apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java | 7 +- .../db/it/trigger/IoTDBTriggerExecutionIT.java | 265 +++++++++++++ .../iotdb/session/it/IoTDBSessionInsertNulIT.java | 2 +- .../IoTDBSessionInsertWithTriggerExecutionIT.java | 337 ++++++++++++++++ .../src/test/resources/TriggerFireTimesCounter.jar | Bin 0 -> 1324 bytes ...IoTDBLoadExternalTsFileWithTimePartitionIT.java | 329 ---------------- .../apache/iotdb/commons/conf/IoTDBConstant.java | 5 + ...ception.java => PipeAlreadyExistException.java} | 12 +- .../exception/sync/PipeNotExistException.java | 4 - .../sync/PipeSinkAlreadyExistException.java} | 16 +- ...eption.java => PipeSinkBeingUsedException.java} | 12 +- .../sync/PipeSinkNotExistException.java} | 16 +- .../commons/executable/ExecutableManager.java | 11 + .../commons/partition/DataPartitionTable.java | 4 +- .../commons/partition/SchemaPartitionTable.java | 2 +- .../commons/partition/SeriesPartitionTable.java | 2 +- .../org/apache/iotdb/commons/path/PartialPath.java | 13 + .../iotdb/commons/sync/metadata/SyncMetadata.java | 122 +++--- .../commons/sync/persistence/SyncLogReader.java | 31 +- .../apache/iotdb/commons/sync/pipe/PipeStatus.java | 1 - .../iotdb/commons/trigger/TriggerInformation.java | 15 + .../apache/iotdb/commons/trigger/TriggerTable.java | 45 +++ .../commons/sync/metedata/SyncMetadataTest.java | 189 +++++++++ .../resources/conf/iotdb-datanode.properties | 24 ++ .../apache/iotdb/db/client/ConfigNodeClient.java | 43 ++- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 53 +++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 17 + .../db/consensus/DataRegionConsensusImpl.java | 10 + .../db/consensus/SchemaRegionConsensusImpl.java | 10 + .../impl/ReadChunkCompactionPerformer.java | 21 +- .../iotdb/db/engine/flush/MemTableFlushTask.java | 10 +- .../iotdb/db/engine/load/AlignedChunkData.java | 9 +- .../org/apache/iotdb/db/engine/load/ChunkData.java | 14 +- .../apache/iotdb/db/engine/load/DeletionData.java | 72 ++++ .../iotdb/db/engine/load/LoadTsFileManager.java | 29 +- .../iotdb/db/engine/load/NonAlignedChunkData.java | 1 + .../load/{ChunkData.java => TsFileData.java} | 42 +- .../iotdb/db/engine/storagegroup/DataRegion.java | 11 +- .../db/engine/storagegroup/TsFileManager.java | 5 +- .../db/engine/storagegroup/TsFileProcessor.java | 6 +- .../storagegroup/timeindex/DeviceTimeIndex.java | 5 +- .../db/mpp/common/header/ColumnHeaderConstant.java | 37 +- .../db/mpp/common/header/DatasetHeaderFactory.java | 10 +- .../execution/exchange/MPPDataExchangeService.java | 8 +- .../execution/executor/RegionWriteExecutor.java | 36 +- .../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 29 ++ .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 128 ++++++- .../iotdb/db/mpp/plan/analyze/SelectIntoUtils.java | 105 +++++ .../db/mpp/plan/execution/QueryExecution.java | 17 +- .../plan/execution/config/ConfigTaskVisitor.java | 9 +- .../config/executor/ClusterConfigTaskExecutor.java | 30 +- .../config/executor/IConfigTaskExecutor.java | 4 +- .../executor/StandaloneConfigTaskExecutor.java | 4 +- .../{GetRegionTask.java => GetRegionIdTask.java} | 22 +- .../mpp/plan/expression/leaf/ConstantOperand.java | 4 +- .../plan/expression/leaf/TimeSeriesOperand.java | 9 + .../iotdb/db/mpp/plan/parser/ASTVisitor.java | 111 +++++- .../plan/node/load/LoadSingleTsFileNode.java | 69 +++- .../planner/plan/node/load/LoadTsFileNode.java | 3 + .../plan/node/load/LoadTsFilePieceNode.java | 35 +- .../parameter/DeviceViewIntoPathDescriptor.java | 208 ++++++++++ .../planner/plan/parameter/IntoPathDescriptor.java | 167 ++++++++ .../plan/scheduler/load/LoadTsFileScheduler.java | 7 + .../db/mpp/plan/statement/StatementVisitor.java | 6 +- .../plan/statement/component/IntoComponent.java | 237 ++++++++++++ .../db/mpp/plan/statement/component/IntoItem.java | 73 ++++ .../plan/statement/crud/LoadTsFileStatement.java | 4 +- .../db/mpp/plan/statement/crud/QueryStatement.java | 38 ++ ...ionStatement.java => GetRegionIdStatement.java} | 6 +- .../dag/input/ConstantInputReader.java | 3 +- .../transformation/dag/util/TransformUtils.java | 3 +- .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 34 +- .../org/apache/iotdb/db/rescon/SystemInfo.java | 7 +- .../java/org/apache/iotdb/db/service/DataNode.java | 4 - .../impl/DataNodeInternalRPCServiceImpl.java | 52 +++ .../java/org/apache/iotdb/db/sync/SyncService.java | 170 ++++----- .../db/sync/common/ClusterSyncInfoFetcher.java | 7 +- .../iotdb/db/sync/common/ISyncInfoFetcher.java | 4 +- .../apache/iotdb/db/sync/common/LocalSyncInfo.java | 22 +- .../iotdb/db/sync/common/LocalSyncInfoFetcher.java | 11 +- .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java | 7 + .../iotdb/db/sync/sender/pipe/TsFilePipe.java | 49 ++- .../db/sync/transport/client/SenderManager.java | 2 + .../iotdb/db/trigger/executor/TriggerExecutor.java | 41 +- .../db/trigger/executor/TriggerFireResult.java | 57 +++ .../db/trigger/executor/TriggerFireVisitor.java | 423 +++++++++++++++++++++ .../trigger/service/TriggerManagementService.java | 85 +++++ .../apache/iotdb/db/utils/sync/SyncPipeUtil.java | 2 +- .../ReadChunkCompactionPerformerAlignedTest.java | 79 ++++ .../db/engine/storagegroup/DataRegionTest.java | 40 ++ .../db/mpp/common/schematree/NodeRefTest.java | 47 +++ .../iotdb/db/mpp/plan/analyze/AnalyzeTest.java | 169 ++++++++ .../manager => persistence}/LocalSyncInfoTest.java | 23 +- .../recovery => persistence}/SyncLogTest.java | 36 +- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 2 + .../src/main/thrift/confignode.thrift | 71 ++-- thrift/src/main/thrift/datanode.thrift | 30 ++ .../tsfile/common/constant/TsFileConstant.java | 4 + .../tsfile/write/chunk/AlignedChunkWriterImpl.java | 12 + .../apache/iotdb/tsfile/write/record/Tablet.java | 341 ++++++++++++++++- .../iotdb/tsfile/write/writer/TsFileIOWriter.java | 4 + .../iotdb/tsfile/write/record/TabletTest.java | 65 ++++ 190 files changed, 6184 insertions(+), 1511 deletions(-) diff --cc confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index bc7f9f6ce0,f2a57d462a..488cb0298d --- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@@ -475,38 -473,38 +475,68 @@@ public class ConfigNodeDescriptor properties.getProperty( "schema_region_ratis_max_sleep_time_ms", String.valueOf(conf.getSchemaRegionRatisMaxSleepTimeMs())))); + + conf.setPartitionRegionRatisPreserveLogsWhenPurge( + Long.parseLong( + properties.getProperty( + "partition_region_ratis_preserve_logs_num_when_purge", + String.valueOf(conf.getPartitionRegionRatisPreserveLogsWhenPurge())))); + + conf.setSchemaRegionRatisPreserveLogsWhenPurge( + Long.parseLong( + properties.getProperty( + "schema_region_ratis_preserve_logs_num_when_purge", + String.valueOf(conf.getSchemaRegionRatisPreserveLogsWhenPurge())))); + + conf.setDataRegionRatisPreserveLogsWhenPurge( + Long.parseLong( + properties.getProperty( + "data_region_ratis_preserve_logs_num_when_purge", + String.valueOf(conf.getDataRegionRatisPreserveLogsWhenPurge())))); + + conf.setRatisFirstElectionTimeoutMinMs( + Long.parseLong( + properties.getProperty( + "ratis_first_election_timeout_min_ms", + String.valueOf(conf.getRatisFirstElectionTimeoutMinMs())))); + + conf.setRatisFirstElectionTimeoutMaxMs( + Long.parseLong( + properties.getProperty( + "ratis_first_election_timeout_max_ms", + String.valueOf(conf.getRatisFirstElectionTimeoutMaxMs())))); } + private void loadCQConfig(Properties properties) { + int cqSubmitThread = + Integer.parseInt( + properties.getProperty( + "continuous_query_submit_thread", String.valueOf(conf.getCqSubmitThread()))); + if (cqSubmitThread <= 0) { + LOGGER.warn( + "continuous_query_submit_thread should be greater than 0, but current value is {}, ignore that and use the default value {}", + cqSubmitThread, + conf.getCqSubmitThread()); + cqSubmitThread = conf.getCqSubmitThread(); + } + conf.setCqSubmitThread(cqSubmitThread); + + long cqMinEveryIntervalInMs = + Long.parseLong( + properties.getProperty( + "continuous_query_min_every_interval_in_ms", + String.valueOf(conf.getCqMinEveryIntervalInMs()))); + if (cqMinEveryIntervalInMs <= 0) { + LOGGER.warn( + "continuous_query_min_every_interval_in_ms should be greater than 0, but current value is {}, ignore that and use the default value {}", + cqMinEveryIntervalInMs, + conf.getCqMinEveryIntervalInMs()); + cqMinEveryIntervalInMs = conf.getCqMinEveryIntervalInMs(); + } + + conf.setCqMinEveryIntervalInMs(cqMinEveryIntervalInMs); + } + /** * Check if the current ConfigNode is SeedConfigNode. * diff --cc confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java index bf5a438f47,c38ec65dae..b7226dd199 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java @@@ -304,20 -310,14 +315,28 @@@ public abstract class ConfigPhysicalPla case GetSeriesSlotList: req = new GetSeriesSlotListPlan(); break; + case UpdateTriggersOnTransferNodes: + req = new UpdateTriggersOnTransferNodesPlan(); + break; + case UpdateTriggerLocation: + req = new UpdateTriggerLocationPlan(); + break; + case GetTransferringTriggers: + req = new GetTransferringTriggersPlan(); + case ACTIVE_CQ: + req = new ActiveCQPlan(); + break; + case ADD_CQ: + req = new AddCQPlan(); + break; + case DROP_CQ: + req = new DropCQPlan(); + break; + case UPDATE_CQ_LAST_EXEC_TIME: + req = new UpdateCQLastExecTimePlan(); + break; + case SHOW_CQ: + req = new ShowCQPlan(); break; default: throw new IOException("unknown PhysicalPlan type: " + typeNum); diff --cc confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java index be8fad06e7,f2492a1766..8bcb95c125 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java @@@ -89,12 -90,11 +90,16 @@@ public enum ConfigPhysicalPlanType GetTriggerTable, UpdateTriggerStateInTable, GetTriggerJar, - GetRouting, + GetRegionId, GetSeriesSlotList, GetTimeSlotList, + UpdateTriggersOnTransferNodes, + UpdateTriggerLocation, + GetTransferringTriggers, - GetTriggerLocation ++ GetTriggerLocation, + DROP_CQ, + ACTIVE_CQ, + ADD_CQ, + UPDATE_CQ_LAST_EXEC_TIME, + SHOW_CQ } diff --cc confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index eb7973a871,1f76144618..e3625bd3dd --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@@ -94,9 -92,9 +95,10 @@@ import org.apache.iotdb.confignode.rpc. import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp; + import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq; import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp; diff --cc confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index 3d7e055e41,a0f3ce3041..5007536503 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@@ -55,9 -53,9 +55,10 @@@ import org.apache.iotdb.confignode.rpc. import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp; + import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq; import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp; diff --cc confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java index 7fc4b3ba8b,e56e4475e1..a30f834235 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java @@@ -46,7 -59,7 +59,8 @@@ import org.slf4j.LoggerFactory import java.io.IOException; import java.util.Collections; import java.util.List; + import java.util.Map; +import java.util.Optional; public class TriggerManager { private static final Logger LOGGER = LoggerFactory.getLogger(TriggerManager.class); diff --cc confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 2ef6251d36,7162762a87..875017c8f2 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@@ -829,16 -831,35 +842,40 @@@ public class NodeManager } }); - LOGGER.info( - "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result, lowestLoadScore); - return configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get()); + if (result.get() == -1) { + return Optional.empty(); + } else { + LOGGER.info( + "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result, lowestLoadScore); + return Optional.of( + configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get())); + } } + /** + * Get the DataNodeLocation of the lowest load DataNode in input + * + * @return TDataNodeLocation + */ + public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) { + AtomicInteger result = new AtomicInteger(); + AtomicLong lowestLoadScore = new AtomicLong(Long.MAX_VALUE); + + nodes.forEach( + nodeID -> { + BaseNodeCache cache = nodeCacheMap.get(nodeID); + long score = (cache == null) ? Long.MAX_VALUE : cache.getLoadScore(); + if (score < lowestLoadScore.get()) { + result.set(nodeID); + lowestLoadScore.set(score); + } + }); + + LOGGER.info( + "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result, lowestLoadScore); + return configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get()); + } + public boolean isNodeRemoving(int dataNodeId) { DataNodeHeartbeatCache cache = (DataNodeHeartbeatCache) configManager.getNodeManager().getNodeCacheMap().get(dataNodeId); diff --cc confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index fd6b13d486,e5815cd695..7af57e0ff8 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@@ -291,15 -294,9 +306,17 @@@ public class ConfigPlanExecutor case PreCreatePipe: return syncInfo.preCreatePipe((PreCreatePipePlan) physicalPlan); case SetPipeStatus: - return syncInfo.operatePipe((SetPipeStatusPlan) physicalPlan); + return syncInfo.setPipeStatus((SetPipeStatusPlan) physicalPlan); + case DropPipe: + return syncInfo.dropPipe((DropPipePlan) physicalPlan); + case ADD_CQ: + return cqInfo.addCQ((AddCQPlan) physicalPlan); + case DROP_CQ: + return cqInfo.dropCQ((DropCQPlan) physicalPlan); + case ACTIVE_CQ: + return cqInfo.activeCQ((ActiveCQPlan) physicalPlan); + case UPDATE_CQ_LAST_EXEC_TIME: + return cqInfo.updateCQLastExecutionTime((UpdateCQLastExecTimePlan) physicalPlan); default: throw new UnknownPhysicalPlanTypeException(physicalPlan.getType()); } diff --cc server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 6913df7cb0,c17ea193c3..20c624583f --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@@ -1964,12 -1974,14 +1975,18 @@@ public class IoTDBDescriptor conf.setSchemaRatisConsensusMaxRetryAttempts(ratisConfig.getSchemaMaxRetryAttempts()); conf.setSchemaRatisConsensusInitialSleepTimeMs(ratisConfig.getSchemaInitialSleepTime()); conf.setSchemaRatisConsensusMaxSleepTimeMs(ratisConfig.getSchemaMaxSleepTime()); + + conf.setDataRatisConsensusPreserveWhenPurge(ratisConfig.getDataPreserveWhenPurge()); + conf.setSchemaRatisConsensusPreserveWhenPurge(ratisConfig.getSchemaPreserveWhenPurge()); + + conf.setRatisFirstElectionTimeoutMinMs(ratisConfig.getFirstElectionTimeoutMin()); + conf.setRatisFirstElectionTimeoutMaxMs(ratisConfig.getFirstElectionTimeoutMax()); } + public void loadCQConfig(TCQConfig cqConfig) { + conf.setCqMinEveryIntervalInMs(cqConfig.getCqMinEveryIntervalInMs()); + } + public void reclaimConsensusMemory() { conf.setAllocateMemoryForStorageEngine( conf.getAllocateMemoryForStorageEngine() + conf.getAllocateMemoryForConsensus()); diff --cc server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java index 690070bff6,30a2eeb644..d014d1052a --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java @@@ -157,11 -140,13 +161,15 @@@ import org.apache.iotdb.mpp.rpc.thrift. import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq; import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq; import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq; + import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; + import org.apache.iotdb.trigger.api.enums.FailureStrategy; + import org.apache.iotdb.trigger.api.enums.TriggerEvent; import org.apache.iotdb.tsfile.exception.NotImplementedException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; + import org.apache.iotdb.tsfile.write.record.Tablet; import com.google.common.collect.ImmutableList; import org.apache.thrift.TException; diff --cc thrift-confignode/src/main/thrift/confignode.thrift index c5c5c5f0ac,4af568e3e0..9069d03c82 --- a/thrift-confignode/src/main/thrift/confignode.thrift +++ b/thrift-confignode/src/main/thrift/confignode.thrift @@@ -81,12 -80,14 +81,18 @@@ struct TRatisConfig 20: required i64 dataInitialSleepTime 21: required i64 schemaMaxSleepTime 22: required i64 dataMaxSleepTime + + 23: required i64 schemaPreserveWhenPurge + 24: required i64 dataPreserveWhenPurge + + 25: required i64 firstElectionTimeoutMin + 26: required i64 firstElectionTimeoutMax } +struct TCQConfig { + 1: required i64 cqMinEveryIntervalInMs +} + struct TDataNodeRemoveReq { 1: required list<common.TDataNodeLocation> dataNodeLocations }
