This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5c10695f032 Pipe: filter out databases whose name starts with
`root.__system.` (#12957)
5c10695f032 is described below
commit 5c10695f032e813220d2c71e9ca3e1a00b9a24b4
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jul 18 10:16:39 2024 +0800
Pipe: filter out databases whose name starts with `root.__system.` (#12957)
---
.../manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java | 5 ++++-
.../confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java | 6 ++++--
.../confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java | 4 +++-
3 files changed, 11 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
index a3cb82ebf52..284779fcb03 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
@@ -34,6 +34,7 @@ import org.apache.tsfile.utils.Pair;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
public class PipeLeaderChangeHandler implements IClusterStatusSubscriber {
@@ -85,7 +86,9 @@ public class PipeLeaderChangeHandler implements
IClusterStatusSubscriber {
configManager.getPartitionManager().getRegionStorageGroup(regionGroupId);
// Pipe only collect user's data, filter metric database here.
// DatabaseName may be null for config region group
- if (!SchemaConstant.SYSTEM_DATABASE.equals(databaseName)) {
+ if (Objects.isNull(databaseName)
+ || (!databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
+ &&
!databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + "."))) {
// null or -1 means empty origin leader
final int oldLeaderNodeId = (pair.left == null ? -1 :
pair.left.getLeaderId());
final int newLeaderNodeId = (pair.right == null ? -1 :
pair.right.getLeaderId());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index e59eb3a1896..bfd35a3bc96 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -147,6 +147,7 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
currentConsensusGroupId2PipeTaskMeta.get(regionGroupId.getId());
if (databaseName != null
&& !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
+ && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE +
".")
&& currentPipeTaskMeta.getLeaderNodeId() ==
regionLeaderNodeId) {
// Pipe only collect user's data, filter metric database here.
updatedConsensusGroupIdToTaskMetaMap.put(
@@ -161,15 +162,16 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
// config region
updatedConsensusGroupIdToTaskMetaMap.put(
// 0 is the consensus group id of the config region, but data region
id and schema region
- // id
- // also start from 0, so we use Integer.MIN_VALUE to represent the
config region
+ // id also start from 0, so we use Integer.MIN_VALUE to represent
the config region
Integer.MIN_VALUE,
new PipeTaskMeta(
configRegionTaskMeta.getProgressIndex(),
// The leader of the config region is the config node itself
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
}
+
updatedPipeRuntimeMeta = new
PipeRuntimeMeta(updatedConsensusGroupIdToTaskMetaMap);
+
// If the pipe's previous status was user stopped, then after the alter
operation, the pipe's
// status remains user stopped; otherwise, it becomes running.
if
(!pipeTaskInfo.get().isPipeStoppedByUser(alterPipeRequest.getPipeName())) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index bbc9d7b2298..68075b5911d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -172,7 +172,9 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
env.getConfigManager()
.getPartitionManager()
.getRegionStorageGroup(regionGroupId);
- if (databaseName != null &&
!databaseName.equals(SchemaConstant.SYSTEM_DATABASE)) {
+ if (databaseName != null
+ && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
+ && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE
+ ".")) {
// Pipe only collect user's data, filter out metric database
here.
consensusGroupIdToTaskMetaMap.put(
regionGroupId.getId(),