This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7a4c1273151 IoTV2: Fix closing receiver corner case regarding to
concurrently closing and receiving file. (#15271)
7a4c1273151 is described below
commit 7a4c12731515cfdc17314d135d32466bf6cc7984
Author: Peng Junzhi <[email protected]>
AuthorDate: Mon Apr 7 18:53:10 2025 +0800
IoTV2: Fix closing receiver corner case regarding to concurrently closing
and receiving file. (#15271)
* fix closing problem
* fix comment
---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../pipeconsensus/PipeConsensusReceiver.java | 26 ++++++++++++++++++++++
2 files changed, 27 insertions(+)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 58c9de7502c..6fb9cdcf7a3 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -312,6 +312,7 @@ public enum TSStatusCode {
PIPE_CONSENSUS_TYPE_ERROR(2205),
CONSENSUS_GROUP_NOT_EXIST(2206),
RATIS_READ_UNAVAILABLE(2207),
+ PIPE_CONSENSUS_CLOSE_ERROR(2208),
;
private final int statusCode;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 6d1a86eba34..10572e0a881 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -82,6 +82,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -106,6 +107,7 @@ public class PipeConsensusReceiver {
private final List<String> receiveDirs = new ArrayList<>();
private final PipeConsensusReceiverMetrics pipeConsensusReceiverMetrics;
private final FolderManager folderManager;
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
public PipeConsensusReceiver(
PipeConsensus pipeConsensus,
@@ -1043,6 +1045,8 @@ public class PipeConsensusReceiver {
}
public synchronized void handleExit() {
+ // only after closing request executor, can we clean receiver.
+ requestExecutor.tryClose();
// Clear the tsFileWriters and receiver base dirs
pipeConsensusTsFileWriterPool.handleExit(consensusPipeName);
clearAllReceiverBaseDir();
@@ -1363,6 +1367,16 @@ public class PipeConsensusReceiver {
}
}
+ private void tryClose() {
+ // It will not be closed until all requests sent before closing are done.
+ lock.lock();
+ try {
+ isClosed.set(true);
+ } finally {
+ lock.unlock();
+ }
+ }
+
private TPipeConsensusTransferResp onRequest(
final TPipeConsensusTransferReq req,
final boolean isTransferTsFilePiece,
@@ -1370,6 +1384,18 @@ public class PipeConsensusReceiver {
long startAcquireLockNanos = System.nanoTime();
lock.lock();
try {
+ if (isClosed.get()) {
+ final TSStatus status =
+ new TSStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_CONSENSUS_CLOSE_ERROR,
+ "PipeConsensus receiver received a request after it was
closed."));
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: received a request after receiver
was closed and pipe task was dropped.",
+ consensusPipeName);
+ return new TPipeConsensusTransferResp(status);
+ }
+
long startDispatchNanos = System.nanoTime();
metric.recordAcquireExecutorLockTimer(startDispatchNanos -
startAcquireLockNanos);