This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0afc1bea507 Pipe: Fix remaining time metrics is not dropped after 
dropPipe(pipeName) & alter pipe does not take effect in non-data regions 
(#12795)
0afc1bea507 is described below

commit 0afc1bea5077afbb7f9c2cb7f942299e06dc0b0b
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 25 11:27:40 2024 +0800

    Pipe: Fix remaining time metrics is not dropped after dropPipe(pipeName) & 
alter pipe does not take effect in non-data regions (#12795)
---
 .../impl/pipe/task/AlterPipeProcedureV2.java       | 44 ++++++++++++++--------
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  4 +-
 2 files changed, 31 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 936ad32e01d..6cccbbd7033 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
@@ -27,6 +26,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.AlterPipePlanV2;
 import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -125,28 +125,40 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
 
     final ConcurrentMap<Integer, PipeTaskMeta> 
updatedConsensusGroupIdToTaskMetaMap =
         new ConcurrentHashMap<>();
+    // data regions & schema regions
     env.getConfigManager()
         .getLoadManager()
         .getRegionLeaderMap()
         .forEach(
             (regionGroupId, regionLeaderNodeId) -> {
-              if 
(regionGroupId.getType().equals(TConsensusGroupType.DataRegion)) {
-                final String databaseName =
-                    env.getConfigManager()
-                        .getPartitionManager()
-                        .getRegionStorageGroup(regionGroupId);
-                final PipeTaskMeta currentPipeTaskMeta =
-                    
currentConsensusGroupId2PipeTaskMeta.get(regionGroupId.getId());
-                if (databaseName != null
-                    && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
-                    && currentPipeTaskMeta.getLeaderNodeId() == 
regionLeaderNodeId) {
-                  // Pipe only collect user's data, filter metric database 
here.
-                  updatedConsensusGroupIdToTaskMetaMap.put(
-                      regionGroupId.getId(),
-                      new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(), 
regionLeaderNodeId));
-                }
+              final String databaseName =
+                  
env.getConfigManager().getPartitionManager().getRegionStorageGroup(regionGroupId);
+              final PipeTaskMeta currentPipeTaskMeta =
+                  
currentConsensusGroupId2PipeTaskMeta.get(regionGroupId.getId());
+              if (databaseName != null
+                  && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
+                  && currentPipeTaskMeta.getLeaderNodeId() == 
regionLeaderNodeId) {
+                // Pipe only collect user's data, filter metric database here.
+                updatedConsensusGroupIdToTaskMetaMap.put(
+                    regionGroupId.getId(),
+                    new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(), 
regionLeaderNodeId));
               }
             });
+
+    final PipeTaskMeta configRegionTaskMeta =
+        currentConsensusGroupId2PipeTaskMeta.get(Integer.MIN_VALUE);
+    if (Objects.nonNull(configRegionTaskMeta)) {
+      // config region
+      updatedConsensusGroupIdToTaskMetaMap.put(
+          // 0 is the consensus group id of the config region, but data region 
id and schema region
+          // id
+          // also start from 0, so we use Integer.MIN_VALUE to represent the 
config region
+          Integer.MIN_VALUE,
+          new PipeTaskMeta(
+              configRegionTaskMeta.getProgressIndex(),
+              // The leader of the config region is the config node itself
+              ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
+    }
     updatedPipeRuntimeMeta = new 
PipeRuntimeMeta(updatedConsensusGroupIdToTaskMetaMap);
     // If the pipe's previous status was user stopped, then after the alter 
operation, the pipe's
     // status remains user stopped; otherwise, it becomes running.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 0255dc05c35..f1fd633c769 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -262,11 +262,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
   @Override
   protected boolean dropPipe(final String pipeName) {
+    // Get the pipe meta first because it is removed after 
super#dropPipe(pipeName)
+    final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
     if (!super.dropPipe(pipeName)) {
       return false;
     }
 
-    final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
     if (Objects.nonNull(pipeMeta)) {
       final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
       PipeDataNodeRemainingEventAndTimeMetrics.getInstance()

Reply via email to