This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0afc1bea507 Pipe: Fix remaining time metrics is not dropped after
dropPipe(pipeName) & alter pipe does not take effect in non-data regions
(#12795)
0afc1bea507 is described below
commit 0afc1bea5077afbb7f9c2cb7f942299e06dc0b0b
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 25 11:27:40 2024 +0800
Pipe: Fix remaining time metrics is not dropped after dropPipe(pipeName) &
alter pipe does not take effect in non-data regions (#12795)
---
.../impl/pipe/task/AlterPipeProcedureV2.java | 44 ++++++++++++++--------
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 4 +-
2 files changed, 31 insertions(+), 17 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 936ad32e01d..6cccbbd7033 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.task;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
@@ -27,6 +26,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.AlterPipePlanV2;
import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -125,28 +125,40 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
final ConcurrentMap<Integer, PipeTaskMeta>
updatedConsensusGroupIdToTaskMetaMap =
new ConcurrentHashMap<>();
+ // data regions & schema regions
env.getConfigManager()
.getLoadManager()
.getRegionLeaderMap()
.forEach(
(regionGroupId, regionLeaderNodeId) -> {
- if
(regionGroupId.getType().equals(TConsensusGroupType.DataRegion)) {
- final String databaseName =
- env.getConfigManager()
- .getPartitionManager()
- .getRegionStorageGroup(regionGroupId);
- final PipeTaskMeta currentPipeTaskMeta =
-
currentConsensusGroupId2PipeTaskMeta.get(regionGroupId.getId());
- if (databaseName != null
- && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
- && currentPipeTaskMeta.getLeaderNodeId() ==
regionLeaderNodeId) {
- // Pipe only collect user's data, filter metric database
here.
- updatedConsensusGroupIdToTaskMetaMap.put(
- regionGroupId.getId(),
- new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(),
regionLeaderNodeId));
- }
+ final String databaseName =
+
env.getConfigManager().getPartitionManager().getRegionStorageGroup(regionGroupId);
+ final PipeTaskMeta currentPipeTaskMeta =
+
currentConsensusGroupId2PipeTaskMeta.get(regionGroupId.getId());
+ if (databaseName != null
+ && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
+ && currentPipeTaskMeta.getLeaderNodeId() ==
regionLeaderNodeId) {
+ // Pipe only collect user's data, filter metric database here.
+ updatedConsensusGroupIdToTaskMetaMap.put(
+ regionGroupId.getId(),
+ new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(),
regionLeaderNodeId));
}
});
+
+ final PipeTaskMeta configRegionTaskMeta =
+ currentConsensusGroupId2PipeTaskMeta.get(Integer.MIN_VALUE);
+ if (Objects.nonNull(configRegionTaskMeta)) {
+ // config region
+ updatedConsensusGroupIdToTaskMetaMap.put(
+ // 0 is the consensus group id of the config region, but data region
id and schema region
+ // id
+ // also start from 0, so we use Integer.MIN_VALUE to represent the
config region
+ Integer.MIN_VALUE,
+ new PipeTaskMeta(
+ configRegionTaskMeta.getProgressIndex(),
+ // The leader of the config region is the config node itself
+ ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
+ }
updatedPipeRuntimeMeta = new
PipeRuntimeMeta(updatedConsensusGroupIdToTaskMetaMap);
// If the pipe's previous status was user stopped, then after the alter
operation, the pipe's
// status remains user stopped; otherwise, it becomes running.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 0255dc05c35..f1fd633c769 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -262,11 +262,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
@Override
protected boolean dropPipe(final String pipeName) {
+ // Get the pipe meta first because it is removed after
super#dropPipe(pipeName)
+ final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
if (!super.dropPipe(pipeName)) {
return false;
}
- final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (Objects.nonNull(pipeMeta)) {
final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()