This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 83d4d2f1dbc Pipe: The DataRegion migration process checks whether the
user pipe has released all related resources (#13183) (#13228)
83d4d2f1dbc is described below
commit 83d4d2f1dbc1b5c4b2104eae55296634b92a30fe
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Aug 20 09:56:23 2024 +0800
Pipe: The DataRegion migration process checks whether the user pipe has
released all related resources (#13183) (#13228)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../java/org/apache/iotdb/consensus/IStateMachine.java | 2 +-
.../iotdb/consensus/iot/IoTConsensusServerImpl.java | 5 +++--
.../iot/service/IoTConsensusRPCServiceProcessor.java | 3 ++-
.../statemachine/dataregion/DataRegionStateMachine.java | 11 ++++++-----
.../iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java | 15 +++++++++++++++
.../listening/AbstractSerializableListeningQueue.java | 4 ++--
.../apache/iotdb/commons/pipe/task/PipeTaskManager.java | 13 +++++++++++++
7 files changed, 42 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index d0473ca1d04..692b60beb2a 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -72,7 +72,7 @@ public interface IStateMachine {
*
* @return true if all resources are released successfully
*/
- default boolean hasReleaseAllRegionRelatedResource() {
+ default boolean hasReleaseAllRegionRelatedResource(ConsensusGroupId groupId)
{
return true;
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index c077e978ea6..de3643bca6c 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
import org.apache.iotdb.commons.service.metric.MetricService;
@@ -621,8 +622,8 @@ public class IoTConsensusServerImpl {
}
}
- public boolean hasReleaseAllRegionRelatedResource() {
- return stateMachine.hasReleaseAllRegionRelatedResource();
+ public boolean hasReleaseAllRegionRelatedResource(ConsensusGroupId groupId) {
+ return stateMachine.hasReleaseAllRegionRelatedResource(groupId);
}
public void waitReleaseAllRegionRelatedResource(Peer targetPeer)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index ffc60568c7b..07fa67ee539 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -250,7 +250,8 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Ifa
LOGGER.error(message);
return new TWaitReleaseAllRegionRelatedResourceRes(true);
}
- return new
TWaitReleaseAllRegionRelatedResourceRes(impl.hasReleaseAllRegionRelatedResource());
+ return new TWaitReleaseAllRegionRelatedResourceRes(
+ impl.hasReleaseAllRegionRelatedResource(groupId));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index 7b168644122..0a377551a9c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.consensus.statemachine.dataregion;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -28,6 +29,7 @@ import
org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.iot.log.GetConsensusReqReaderPlan;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.statemachine.BaseStateMachine;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -310,15 +312,14 @@ public class DataRegionStateMachine extends
BaseStateMachine {
}
}
- public boolean hasPipeReleaseRegionRelatedResource() {
- // TODO: implement the method to check whether the user pipe has released
all resources related
- return true;
+ public boolean hasPipeReleaseRegionRelatedResource(ConsensusGroupId groupId)
{
+ return
PipeDataNodeAgent.task().hasPipeReleaseRegionRelatedResource(groupId.getId());
}
@Override
- public boolean hasReleaseAllRegionRelatedResource() {
+ public boolean hasReleaseAllRegionRelatedResource(ConsensusGroupId groupId) {
boolean releaseAllResource = true;
- releaseAllResource &= hasPipeReleaseRegionRelatedResource();
+ releaseAllResource &= hasPipeReleaseRegionRelatedResource(groupId);
return releaseAllResource;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index ab0e8cfafef..888f7c50c96 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -648,6 +648,21 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
: pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().keySet();
}
+ public boolean hasPipeReleaseRegionRelatedResource(final int
consensusGroupId) {
+ if (!tryReadLockWithTimeOut(10)) {
+ LOGGER.warn(
+ "Failed to check if pipe has release region related resource with
consensus group id: {}.",
+ consensusGroupId);
+ return false;
+ }
+
+ try {
+ return !pipeTaskManager.hasPipeTaskInConsensusGroup(consensusGroupId);
+ } finally {
+ releaseReadLock();
+ }
+ }
+
///////////////////////// Pipe Consensus /////////////////////////
public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final
int consensusGroupId) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
index afa62aaf61e..f398bf7e0ab 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
@@ -108,7 +108,7 @@ public abstract class AbstractSerializableListeningQueue<E>
implements Closeable
public synchronized boolean serializeToFile(final File snapshotName) throws
IOException {
final File snapshotFile = new File(String.valueOf(snapshotName));
if (snapshotFile.exists() && snapshotFile.isFile()) {
- LOGGER.error(
+ LOGGER.warn(
"Failed to serialize to file, because file {} is already exist.",
snapshotFile.getAbsolutePath());
return false;
@@ -132,7 +132,7 @@ public abstract class AbstractSerializableListeningQueue<E>
implements Closeable
public synchronized void deserializeFromFile(final File snapshotName) throws
IOException {
final File snapshotFile = new File(String.valueOf(snapshotName));
if (!snapshotFile.exists() || !snapshotFile.isFile()) {
- LOGGER.error(
+ LOGGER.warn(
"Failed to deserialize from file, file {} does not exist.",
snapshotFile.getAbsolutePath());
return;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/PipeTaskManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/PipeTaskManager.java
index 45c12f49c04..42780cd66f0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/PipeTaskManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/PipeTaskManager.java
@@ -107,6 +107,19 @@ public class PipeTaskManager {
return pipeMap.get(pipeStaticMeta);
}
+ /**
+ * Judge whether there is a {@link PipeTask} related to the consensus group
id.
+ *
+ * @param consensusGroupId consensus group id
+ * @return true if there is at least one {@link PipeTask} related to the
consensus group id, false
+ * otherwise
+ */
+ public synchronized boolean hasPipeTaskInConsensusGroup(final int
consensusGroupId) {
+ return pipeMap.values().stream()
+ .anyMatch(
+ consensusGroupId2PipeTask ->
consensusGroupId2PipeTask.containsKey(consensusGroupId));
+ }
+
/**
* Get leader region count in this node.
*