This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-filter-system-databases in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 01248bdcd4bd741a7db5c12b825da12618fe32d0 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Jul 17 15:55:48 2024 +0800 Pipe: filter out databases whose name starts with but not equals to `root.__system` --- .../manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java | 4 +++- .../confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java | 7 ++++--- .../confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java | 3 ++- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java index a3cb82ebf52..f2d998674cc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java @@ -34,6 +34,7 @@ import org.apache.tsfile.utils.Pair; import java.util.HashMap; import java.util.Map; +import java.util.Objects; public class PipeLeaderChangeHandler implements IClusterStatusSubscriber { @@ -85,7 +86,8 @@ public class PipeLeaderChangeHandler implements IClusterStatusSubscriber { configManager.getPartitionManager().getRegionStorageGroup(regionGroupId); // Pipe only collect user's data, filter metric database here. // DatabaseName may be null for config region group - if (!SchemaConstant.SYSTEM_DATABASE.equals(databaseName)) { + if (Objects.nonNull(databaseName) + && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) { // null or -1 means empty origin leader final int oldLeaderNodeId = (pair.left == null ? -1 : pair.left.getLeaderId()); final int newLeaderNodeId = (pair.right == null ? -1 : pair.right.getLeaderId()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java index 6cccbbd7033..78b4469eb7e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java @@ -136,7 +136,7 @@ public class AlterPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { final PipeTaskMeta currentPipeTaskMeta = currentConsensusGroupId2PipeTaskMeta.get(regionGroupId.getId()); if (databaseName != null - && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE) + && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE) && currentPipeTaskMeta.getLeaderNodeId() == regionLeaderNodeId) { // Pipe only collect user's data, filter metric database here. updatedConsensusGroupIdToTaskMetaMap.put( @@ -151,15 +151,16 @@ public class AlterPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { // config region updatedConsensusGroupIdToTaskMetaMap.put( // 0 is the consensus group id of the config region, but data region id and schema region - // id - // also start from 0, so we use Integer.MIN_VALUE to represent the config region + // id also start from 0, so we use Integer.MIN_VALUE to represent the config region Integer.MIN_VALUE, new PipeTaskMeta( configRegionTaskMeta.getProgressIndex(), // The leader of the config region is the config node itself ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId())); } + updatedPipeRuntimeMeta = new PipeRuntimeMeta(updatedConsensusGroupIdToTaskMetaMap); + // If the pipe's previous status was user stopped, then after the alter operation, the pipe's // status remains user stopped; otherwise, it becomes running. if (!pipeTaskInfo.get().isPipeStoppedByUser(alterPipeRequest.getPipeName())) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java index bbc9d7b2298..fd07f57ad0a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java @@ -172,7 +172,8 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { env.getConfigManager() .getPartitionManager() .getRegionStorageGroup(regionGroupId); - if (databaseName != null && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)) { + if (databaseName != null + && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) { // Pipe only collect user's data, filter out metric database here. consensusGroupIdToTaskMetaMap.put( regionGroupId.getId(),
