This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d7649d990c IoTV2: Improve receiver closed and clean dir logic (#15313)
9d7649d990c is described below

commit 9d7649d990c03a925d6b7917c451ab2fe7e6fdc2
Author: Peng Junzhi <[email protected]>
AuthorDate: Mon Apr 14 15:39:17 2025 +0800

    IoTV2: Improve receiver closed and clean dir logic (#15313)
    
    * refine close for receiver
    
    * complete
    
    * improve
    
    * reformat
    
    * reformat
    
    * spotless
    
    * Fix review
    
    * Fix review
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 12 +++
 .../pipeconsensus/PipeConsensusReceiver.java       | 86 +++++++++++-----------
 .../pipeconsensus/PipeConsensusReceiverAgent.java  | 36 ++++++++-
 .../commons/pipe/agent/task/PipeTaskAgent.java     |  2 +-
 4 files changed, 92 insertions(+), 44 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 0af0be13515..7e2191f8293 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -341,6 +341,18 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     return true;
   }
 
+  @Override
+  protected boolean createPipe(final PipeMeta pipeMetaFromCoordinator) throws 
IllegalPathException {
+    String pipeName = pipeMetaFromCoordinator.getStaticMeta().getPipeName();
+    if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
+      // Release corresponding receiver's resource
+      PipeDataNodeAgent.receiver()
+          .pipeConsensus()
+          .markConsensusPipeAsCreated(new ConsensusPipeName(pipeName));
+    }
+    return super.createPipe(pipeMetaFromCoordinator);
+  }
+
   @Override
   protected boolean dropPipe(final String pipeName) {
     // Get the pipe meta first because it is removed after 
super#dropPipe(pipeName)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index f81cbdfe88f..5f353a61445 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -1072,8 +1072,8 @@ public class PipeConsensusReceiver {
   public synchronized void handleExit() {
     // only after closing request executor, can we clean receiver.
     requestExecutor.tryClose();
-    // Clear the tsFileWriters and receiver base dirs
-    pipeConsensusTsFileWriterPool.handleExit(consensusPipeName);
+    // Clear the tsFileWriters, receiverBuffer and receiver base dirs
+    requestExecutor.clear(false);
     clearAllReceiverBaseDir();
     // remove metric
     MetricService.getInstance().removeMetricSet(pipeConsensusReceiverMetrics);
@@ -1425,33 +1425,6 @@ public class PipeConsensusReceiver {
       this.tsFileWriterPool = tsFileWriterPool;
     }
 
-    private void onSuccess(TCommitId commitId, boolean isTransferTsFileSeal) {
-      LOGGER.info(
-          "PipeConsensus-PipeName-{}: process no.{} event successfully!",
-          consensusPipeName,
-          commitId);
-      RequestMeta curMeta = reqExecutionOrderBuffer.pollFirst();
-      onSyncedReplicateIndex = commitId.getReplicateIndex();
-      // update metric, notice that curMeta is never null.
-      if (isTransferTsFileSeal) {
-        tsFileEventCount.decrementAndGet();
-        metric.recordReceiveTsFileTimer(System.nanoTime() - 
curMeta.getStartApplyNanos());
-      } else {
-        WALEventCount.decrementAndGet();
-        metric.recordReceiveWALTimer(System.nanoTime() - 
curMeta.getStartApplyNanos());
-      }
-    }
-
-    private void tryClose() {
-      // It will not be closed until all requests sent before closing are done.
-      lock.lock();
-      try {
-        isClosed.set(true);
-      } finally {
-        lock.unlock();
-      }
-    }
-
     private TPipeConsensusTransferResp onRequest(
         final TPipeConsensusTransferReq req,
         final boolean isTransferTsFilePiece,
@@ -1459,16 +1432,10 @@ public class PipeConsensusReceiver {
       long startAcquireLockNanos = System.nanoTime();
       lock.lock();
       try {
+        // once thread gets lock, it will judge whether receiver is closed
         if (isClosed.get()) {
-          final TSStatus status =
-              new TSStatus(
-                  RpcUtils.getStatus(
-                      TSStatusCode.PIPE_CONSENSUS_CLOSE_ERROR,
-                      "PipeConsensus receiver received a request after it was 
closed."));
-          LOGGER.info(
-              "PipeConsensus-PipeName-{}: received a request after receiver 
was closed and pipe task was dropped.",
-              consensusPipeName);
-          return new TPipeConsensusTransferResp(status);
+          return PipeConsensusReceiverAgent.closedResp(
+              consensusPipeName.toString(), req.getCommitId());
         }
 
         long startDispatchNanos = System.nanoTime();
@@ -1585,6 +1552,12 @@ public class PipeConsensusReceiver {
                   !condition.await(
                       PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS, 
TimeUnit.MILLISECONDS);
 
+              // once thread gets lock, it will judge whether receiver is 
closed
+              if (isClosed.get()) {
+                return PipeConsensusReceiverAgent.closedResp(
+                    consensusPipeName.toString(), req.getCommitId());
+              }
+
               // If some reqs find the buffer no longer contains their 
requestMeta after jumping out
               // from condition.await, it may indicate that during their wait, 
some reqs with newer
               // pipeTaskStartTimes or rebootTimes came in and refreshed the 
requestBuffer. In that
@@ -1649,7 +1622,7 @@ public class PipeConsensusReceiver {
           consensusPipeName);
       // since pipe task will resend all data that hasn't synchronized after 
dataNode reboots, it's
       // safe to clear all events in buffer.
-      clear();
+      clear(true);
       // sync the follower's connectorRebootTimes with connector's actual 
rebootTimes.
       this.connectorRebootTimes = connectorRebootTimes;
       this.pipeTaskRestartTimes = 0;
@@ -1661,14 +1634,43 @@ public class PipeConsensusReceiver {
           consensusPipeName);
       // since pipe task will resend all data that hasn't synchronized after 
restarts, it's safe to
       // clear all events in buffer.
-      clear();
+      clear(false);
       this.pipeTaskRestartTimes = pipeTaskRestartTimes;
     }
 
-    private void clear() {
+    private void onSuccess(TCommitId commitId, boolean isTransferTsFileSeal) {
+      LOGGER.info(
+          "PipeConsensus-PipeName-{}: process no.{} event successfully!",
+          consensusPipeName,
+          commitId);
+      RequestMeta curMeta = reqExecutionOrderBuffer.pollFirst();
+      onSyncedReplicateIndex = commitId.getReplicateIndex();
+      // update metric, notice that curMeta is never null.
+      if (isTransferTsFileSeal) {
+        tsFileEventCount.decrementAndGet();
+        metric.recordReceiveTsFileTimer(System.nanoTime() - 
curMeta.getStartApplyNanos());
+      } else {
+        WALEventCount.decrementAndGet();
+        metric.recordReceiveWALTimer(System.nanoTime() - 
curMeta.getStartApplyNanos());
+      }
+    }
+
+    private void clear(boolean resetSyncIndex) {
       this.reqExecutionOrderBuffer.clear();
       this.tsFileWriterPool.handleExit(consensusPipeName);
-      this.onSyncedReplicateIndex = 0;
+      if (resetSyncIndex) {
+        this.onSyncedReplicateIndex = 0;
+      }
+    }
+
+    private void tryClose() {
+      // It will not be closed until all requests sent before closing are done.
+      lock.lock();
+      try {
+        isClosed.set(true);
+      } finally {
+        lock.unlock();
+      }
     }
 
     private TPipeConsensusTransferResp deprecatedResp(String msg) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
index 524b93dda4c..6d3bef50a00 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.pipe.PipeConsensus;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeReceiver;
+import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -39,7 +40,9 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -66,6 +69,8 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
           ConsensusGroupId, Map<ConsensusPipeName, 
AtomicReference<PipeConsensusReceiver>>>
       replicaReceiverMap = new ConcurrentHashMap<>();
 
+  private final Set<ConsensusPipeName> createdConsensusPipes = new 
CopyOnWriteArraySet<>();
+
   private PipeConsensus pipeConsensus;
 
   public PipeConsensusReceiverAgent() {
@@ -85,13 +90,32 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
     }
   }
 
+  public static TPipeConsensusTransferResp closedResp(String consensusInfo, 
TCommitId tCommitId) {
+    final TSStatus status =
+        new TSStatus(
+            RpcUtils.getStatus(
+                TSStatusCode.PIPE_CONSENSUS_CLOSE_ERROR,
+                "PipeConsensus receiver received a request after it was 
closed."));
+    LOGGER.info(
+        "PipeConsensus-{}: receive on-the-fly no.{} event after consensus pipe 
was dropped, discard it",
+        consensusInfo,
+        tCommitId);
+    return new TPipeConsensusTransferResp(status);
+  }
+
   @Override
   public TPipeConsensusTransferResp receive(TPipeConsensusTransferReq req) {
     final byte reqVersion = req.getVersion();
     if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
       final ConsensusGroupId consensusGroupId =
           
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
-      return getReceiver(consensusGroupId, req.getDataNodeId(), 
reqVersion).receive(req);
+      final PipeConsensusReceiver receiver =
+          getReceiver(consensusGroupId, req.getDataNodeId(), reqVersion);
+
+      if (receiver == null) {
+        return closedResp(consensusGroupId.toString(), req.getCommitId());
+      }
+      return receiver.receive(req);
     } else {
       final TSStatus status =
           RpcUtils.getStatus(
@@ -111,6 +135,11 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
     // 2. Route to given consensusPipeTask's receiver
     ConsensusPipeName consensusPipeName =
         new ConsensusPipeName(consensusGroupId, leaderDataNodeId, thisNodeId);
+    // 3. Judge whether pipe task was dropped
+    if (!createdConsensusPipes.contains(consensusPipeName)) {
+      return null;
+    }
+
     AtomicBoolean isFirstGetReceiver = new AtomicBoolean(false);
     AtomicReference<PipeConsensusReceiver> receiverReference =
         consensusPipe2ReceiverMap.computeIfAbsent(
@@ -192,9 +221,14 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
         consensusPipe2ReciverMap.getOrDefault(pipeName, null);
     // 3. Release receiver
     if (receiverReference != null) {
+      createdConsensusPipes.remove(pipeName);
       receiverReference.get().handleExit();
       receiverReference.set(null);
       consensusPipe2ReciverMap.remove(pipeName);
     }
   }
+
+  public void markConsensusPipeAsCreated(ConsensusPipeName pipeName) {
+    createdConsensusPipes.add(pipeName);
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 2e3b9d0a868..08bb1713d72 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -453,7 +453,7 @@ public abstract class PipeTaskAgent {
    *     if the pipe already exists or is created but should not be started
    * @throws IllegalStateException if the status is illegal
    */
-  private boolean createPipe(final PipeMeta pipeMetaFromCoordinator) throws 
IllegalPathException {
+  protected boolean createPipe(final PipeMeta pipeMetaFromCoordinator) throws 
IllegalPathException {
     final String pipeName = 
pipeMetaFromCoordinator.getStaticMeta().getPipeName();
     final long creationTime = 
pipeMetaFromCoordinator.getStaticMeta().getCreationTime();
 

Reply via email to