This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-filter-system-databases
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 01248bdcd4bd741a7db5c12b825da12618fe32d0
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jul 17 15:55:48 2024 +0800

    Pipe: filter out databases whose name starts with but not equals to 
`root.__system`
---
 .../manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java  | 4 +++-
 .../confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java  | 7 ++++---
 .../confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java | 3 ++-
 3 files changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
index a3cb82ebf52..f2d998674cc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
@@ -34,6 +34,7 @@ import org.apache.tsfile.utils.Pair;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 public class PipeLeaderChangeHandler implements IClusterStatusSubscriber {
 
@@ -85,7 +86,8 @@ public class PipeLeaderChangeHandler implements 
IClusterStatusSubscriber {
                   
configManager.getPartitionManager().getRegionStorageGroup(regionGroupId);
               // Pipe only collect user's data, filter metric database here.
               // DatabaseName may be null for config region group
-              if (!SchemaConstant.SYSTEM_DATABASE.equals(databaseName)) {
+              if (Objects.nonNull(databaseName)
+                  && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) 
{
                 // null or -1 means empty origin leader
                 final int oldLeaderNodeId = (pair.left == null ? -1 : 
pair.left.getLeaderId());
                 final int newLeaderNodeId = (pair.right == null ? -1 : 
pair.right.getLeaderId());
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 6cccbbd7033..78b4469eb7e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -136,7 +136,7 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
               final PipeTaskMeta currentPipeTaskMeta =
                   
currentConsensusGroupId2PipeTaskMeta.get(regionGroupId.getId());
               if (databaseName != null
-                  && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
+                  && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)
                   && currentPipeTaskMeta.getLeaderNodeId() == 
regionLeaderNodeId) {
                 // Pipe only collect user's data, filter metric database here.
                 updatedConsensusGroupIdToTaskMetaMap.put(
@@ -151,15 +151,16 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       // config region
       updatedConsensusGroupIdToTaskMetaMap.put(
           // 0 is the consensus group id of the config region, but data region 
id and schema region
-          // id
-          // also start from 0, so we use Integer.MIN_VALUE to represent the 
config region
+          // id also start from 0, so we use Integer.MIN_VALUE to represent 
the config region
           Integer.MIN_VALUE,
           new PipeTaskMeta(
               configRegionTaskMeta.getProgressIndex(),
               // The leader of the config region is the config node itself
               ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
     }
+
     updatedPipeRuntimeMeta = new 
PipeRuntimeMeta(updatedConsensusGroupIdToTaskMetaMap);
+
     // If the pipe's previous status was user stopped, then after the alter 
operation, the pipe's
     // status remains user stopped; otherwise, it becomes running.
     if 
(!pipeTaskInfo.get().isPipeStoppedByUser(alterPipeRequest.getPipeName())) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index bbc9d7b2298..fd07f57ad0a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -172,7 +172,8 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
                     env.getConfigManager()
                         .getPartitionManager()
                         .getRegionStorageGroup(regionGroupId);
-                if (databaseName != null && 
!databaseName.equals(SchemaConstant.SYSTEM_DATABASE)) {
+                if (databaseName != null
+                    && 
!databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) {
                   // Pipe only collect user's data, filter out metric database 
here.
                   consensusGroupIdToTaskMetaMap.put(
                       regionGroupId.getId(),

Reply via email to