This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 83d4d2f1dbc Pipe: The DataRegion migration process checks whether the 
user pipe has released all related resources (#13183) (#13228)
83d4d2f1dbc is described below

commit 83d4d2f1dbc1b5c4b2104eae55296634b92a30fe
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Aug 20 09:56:23 2024 +0800

    Pipe: The DataRegion migration process checks whether the user pipe has 
released all related resources (#13183) (#13228)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../java/org/apache/iotdb/consensus/IStateMachine.java    |  2 +-
 .../iotdb/consensus/iot/IoTConsensusServerImpl.java       |  5 +++--
 .../iot/service/IoTConsensusRPCServiceProcessor.java      |  3 ++-
 .../statemachine/dataregion/DataRegionStateMachine.java   | 11 ++++++-----
 .../iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java   | 15 +++++++++++++++
 .../listening/AbstractSerializableListeningQueue.java     |  4 ++--
 .../apache/iotdb/commons/pipe/task/PipeTaskManager.java   | 13 +++++++++++++
 7 files changed, 42 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index d0473ca1d04..692b60beb2a 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -72,7 +72,7 @@ public interface IStateMachine {
    *
    * @return true if all resources are released successfully
    */
-  default boolean hasReleaseAllRegionRelatedResource() {
+  default boolean hasReleaseAllRegionRelatedResource(ConsensusGroupId groupId) 
{
     return true;
   }
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index c077e978ea6..de3643bca6c 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
 import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
 import org.apache.iotdb.commons.service.metric.MetricService;
@@ -621,8 +622,8 @@ public class IoTConsensusServerImpl {
     }
   }
 
-  public boolean hasReleaseAllRegionRelatedResource() {
-    return stateMachine.hasReleaseAllRegionRelatedResource();
+  public boolean hasReleaseAllRegionRelatedResource(ConsensusGroupId groupId) {
+    return stateMachine.hasReleaseAllRegionRelatedResource(groupId);
   }
 
   public void waitReleaseAllRegionRelatedResource(Peer targetPeer)
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index ffc60568c7b..07fa67ee539 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -250,7 +250,8 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Ifa
       LOGGER.error(message);
       return new TWaitReleaseAllRegionRelatedResourceRes(true);
     }
-    return new 
TWaitReleaseAllRegionRelatedResourceRes(impl.hasReleaseAllRegionRelatedResource());
+    return new TWaitReleaseAllRegionRelatedResourceRes(
+        impl.hasReleaseAllRegionRelatedResource(groupId));
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index 7b168644122..0a377551a9c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.consensus.statemachine.dataregion;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -28,6 +29,7 @@ import 
org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.iot.log.GetConsensusReqReaderPlan;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.statemachine.BaseStateMachine;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -310,15 +312,14 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
     }
   }
 
-  public boolean hasPipeReleaseRegionRelatedResource() {
-    // TODO: implement the method to check whether the user pipe has released 
all resources related
-    return true;
+  public boolean hasPipeReleaseRegionRelatedResource(ConsensusGroupId groupId) 
{
+    return 
PipeDataNodeAgent.task().hasPipeReleaseRegionRelatedResource(groupId.getId());
   }
 
   @Override
-  public boolean hasReleaseAllRegionRelatedResource() {
+  public boolean hasReleaseAllRegionRelatedResource(ConsensusGroupId groupId) {
     boolean releaseAllResource = true;
-    releaseAllResource &= hasPipeReleaseRegionRelatedResource();
+    releaseAllResource &= hasPipeReleaseRegionRelatedResource(groupId);
     return releaseAllResource;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index ab0e8cfafef..888f7c50c96 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -648,6 +648,21 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
         : pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().keySet();
   }
 
+  public boolean hasPipeReleaseRegionRelatedResource(final int 
consensusGroupId) {
+    if (!tryReadLockWithTimeOut(10)) {
+      LOGGER.warn(
+          "Failed to check if pipe has release region related resource with 
consensus group id: {}.",
+          consensusGroupId);
+      return false;
+    }
+
+    try {
+      return !pipeTaskManager.hasPipeTaskInConsensusGroup(consensusGroupId);
+    } finally {
+      releaseReadLock();
+    }
+  }
+
   ///////////////////////// Pipe Consensus /////////////////////////
 
   public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final 
int consensusGroupId) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
index afa62aaf61e..f398bf7e0ab 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
@@ -108,7 +108,7 @@ public abstract class AbstractSerializableListeningQueue<E> 
implements Closeable
   public synchronized boolean serializeToFile(final File snapshotName) throws 
IOException {
     final File snapshotFile = new File(String.valueOf(snapshotName));
     if (snapshotFile.exists() && snapshotFile.isFile()) {
-      LOGGER.error(
+      LOGGER.warn(
           "Failed to serialize to file, because file {} is already exist.",
           snapshotFile.getAbsolutePath());
       return false;
@@ -132,7 +132,7 @@ public abstract class AbstractSerializableListeningQueue<E> 
implements Closeable
   public synchronized void deserializeFromFile(final File snapshotName) throws 
IOException {
     final File snapshotFile = new File(String.valueOf(snapshotName));
     if (!snapshotFile.exists() || !snapshotFile.isFile()) {
-      LOGGER.error(
+      LOGGER.warn(
           "Failed to deserialize from file, file {} does not exist.",
           snapshotFile.getAbsolutePath());
       return;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/PipeTaskManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/PipeTaskManager.java
index 45c12f49c04..42780cd66f0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/PipeTaskManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/PipeTaskManager.java
@@ -107,6 +107,19 @@ public class PipeTaskManager {
     return pipeMap.get(pipeStaticMeta);
   }
 
+  /**
+   * Judge whether there is a {@link PipeTask} related to the consensus group 
id.
+   *
+   * @param consensusGroupId consensus group id
+   * @return true if there is at least one {@link PipeTask} related to the 
consensus group id, false
+   *     otherwise
+   */
+  public synchronized boolean hasPipeTaskInConsensusGroup(final int 
consensusGroupId) {
+    return pipeMap.values().stream()
+        .anyMatch(
+            consensusGroupId2PipeTask -> 
consensusGroupId2PipeTask.containsKey(consensusGroupId));
+  }
+
   /**
    * Get leader region count in this node.
    *

Reply via email to