This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c78546d27ef IoTV2: Receiver aware of pipe restart & Improve condition 
schedule (#15314)
c78546d27ef is described below

commit c78546d27ef185f82d8d9d8cb6210183fc6812cd
Author: Peng Junzhi <78788603+peng...@users.noreply.github.com>
AuthorDate: Fri Apr 11 21:01:05 2025 +0800

    IoTV2: Receiver aware of pipe restart & Improve condition schedule (#15314)
    
    * receiver aware of pipe restart
    
    * improve condition schedule
    
    * fix comment
    
    * improve log
---
 .../pipeconsensus/PipeConsensusAsyncConnector.java |  2 +
 .../pipeconsensus/PipeConsensusSyncConnector.java  |  3 +
 .../PipeConsensusTransferBatchReqBuilder.java      |  1 +
 .../pipeconsensus/PipeConsensusReceiver.java       | 87 +++++++++++++++-------
 .../src/main/thrift/pipeconsensus.thrift           |  3 +-
 5 files changed, 67 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index a44c30fca63..1fdb12df8d8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -276,6 +276,7 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
       tCommitId =
           new TCommitId(
               pipeInsertNodeTabletInsertionEvent.getReplicateIndexForIoTV2(),
+              
pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(),
               pipeInsertNodeTabletInsertionEvent.getRebootTimes());
 
       // We increase the reference count for this event to determine if the 
event may be released.
@@ -354,6 +355,7 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
     TCommitId tCommitId =
         new TCommitId(
             pipeTsFileInsertionEvent.getReplicateIndexForIoTV2(),
+            pipeTsFileInsertionEvent.getCommitterKey().getRestartTimes(),
             pipeTsFileInsertionEvent.getRebootTimes());
     TConsensusGroupId tConsensusGroupId =
         new TConsensusGroupId(TConsensusGroupType.DataRegion, 
consensusGroupId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index c92f99fb3ac..5365666a7a1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -248,6 +248,7 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
     final TCommitId tCommitId =
         new TCommitId(
             pipeDeleteDataNodeEvent.getReplicateIndexForIoTV2(),
+            pipeDeleteDataNodeEvent.getCommitterKey().getRestartTimes(),
             pipeDeleteDataNodeEvent.getRebootTimes());
     final TConsensusGroupId tConsensusGroupId =
         new TConsensusGroupId(TConsensusGroupType.DataRegion, 
consensusGroupId);
@@ -317,6 +318,7 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
     final TCommitId tCommitId =
         new TCommitId(
             pipeInsertNodeTabletInsertionEvent.getReplicateIndexForIoTV2(),
+            
pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(),
             pipeInsertNodeTabletInsertionEvent.getRebootTimes());
     final TConsensusGroupId tConsensusGroupId =
         new TConsensusGroupId(TConsensusGroupType.DataRegion, 
consensusGroupId);
@@ -376,6 +378,7 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
       final TCommitId tCommitId =
           new TCommitId(
               pipeTsFileInsertionEvent.getReplicateIndexForIoTV2(),
+              pipeTsFileInsertionEvent.getCommitterKey().getRestartTimes(),
               pipeTsFileInsertionEvent.getRebootTimes());
       final TConsensusGroupId tConsensusGroupId =
           new TConsensusGroupId(TConsensusGroupType.DataRegion, 
consensusGroupId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
index e86193dad17..f39613cf2c0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
@@ -183,6 +183,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder 
implements AutoClosea
     commitId =
         new TCommitId(
             pipeInsertNodeTabletInsertionEvent.getReplicateIndexForIoTV2(),
+            
pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(),
             pipeInsertNodeTabletInsertionEvent.getRebootTimes());
 
     // Read the bytebuffer from the wal file and transfer it directly without 
serializing or
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 10572e0a881..a996b6b9c21 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -1325,6 +1325,9 @@ public class PipeConsensusReceiver {
    * although events can arrive receiver in a random sequence.
    */
   private class RequestExecutor {
+    private static final String THIS_NODE = "this node";
+    private static final String PIPE_TASK = "pipe task";
+
     // An ordered set that buffers transfer requests' TCommitId, whose length 
is not larger than
     // PIPE_CONSENSUS_PIPELINE_SIZE.
     // Here we use set is to avoid duplicate events being received in some 
special cases
@@ -1337,6 +1340,7 @@ public class PipeConsensusReceiver {
     private final AtomicInteger tsFileEventCount = new AtomicInteger(0);
     private volatile long onSyncedReplicateIndex = 0;
     private volatile int connectorRebootTimes = 0;
+    private volatile int pipeTaskRestartTimes = 0;
 
     public RequestExecutor(
         PipeConsensusReceiverMetrics metric, PipeConsensusTsFileWriterPool 
tsFileWriterPool) {
@@ -1415,20 +1419,21 @@ public class PipeConsensusReceiver {
         // the request with incremental rebootTimes, the {3} sent before the 
leader restart needs to
         // be discarded.
         if (tCommitId.getDataNodeRebootTimes() < connectorRebootTimes) {
-          final TSStatus status =
-              new TSStatus(
-                  RpcUtils.getStatus(
-                      TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST,
-                      "PipeConsensus receiver received a deprecated request, 
which may be sent before the connector restart. Consider to discard it"));
-          LOGGER.info(
-              "PipeConsensus-PipeName-{}: received a deprecated request, which 
may be sent before the connector restart. Consider to discard it",
-              consensusPipeName);
-          return new TPipeConsensusTransferResp(status);
+          return deprecatedResp(THIS_NODE);
+        }
+        // Similarly, check pipeTask restartTimes
+        if (tCommitId.getDataNodeRebootTimes() == connectorRebootTimes
+            && tCommitId.getPipeTaskRestartTimes() < pipeTaskRestartTimes) {
+          return deprecatedResp(PIPE_TASK);
         }
         // Judge whether connector has rebooted or not, if the rebootTimes 
increases compared to
         // connectorRebootTimes, need to reset receiver because connector has 
been restarted.
         if (tCommitId.getDataNodeRebootTimes() > connectorRebootTimes) {
-          resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes(), 
condition);
+          resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes());
+        }
+        // Similarly, check pipeTask restartTimes
+        if (tCommitId.getPipeTaskRestartTimes() > pipeTaskRestartTimes) {
+          resetWithNewestRestartTime(tCommitId.getPipeTaskRestartTimes());
         }
         // update metric
         if (isTransferTsFilePiece && 
!reqExecutionOrderBuffer.contains(requestMeta)) {
@@ -1476,14 +1481,19 @@ public class PipeConsensusReceiver {
             if (resp != null
                 && resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
               onSuccess(tCommitId, isTransferTsFileSeal);
-              // signal all other reqs to accelerate dispatch process.
-              condition.signalAll();
             }
             return resp;
           }
 
           if (reqExecutionOrderBuffer.size() >= 
IOTDB_CONFIG.getIotConsensusV2PipelineSize()
               && reqExecutionOrderBuffer.first().equals(requestMeta)) {
+            // TODO: Turn it to debug after GA
+            LOGGER.info(
+                "PipeConsensus-PipeName-{}: no.{} event get executed because 
receiver buffer's len >= pipeline, current receiver syncIndex {}, current 
buffer len {}",
+                consensusPipeName,
+                tCommitId,
+                onSyncedReplicateIndex,
+                reqExecutionOrderBuffer.size());
             long startApplyNanos = System.nanoTime();
             metric.recordDispatchWaitingTimer(startApplyNanos - 
startDispatchNanos);
             requestMeta.setStartApplyNanos(startApplyNanos);
@@ -1493,8 +1503,6 @@ public class PipeConsensusReceiver {
             if (resp != null
                 && resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
               onSuccess(tCommitId, isTransferTsFileSeal);
-              // signal all other reqs that may wait for this event
-              condition.signalAll();
             }
             return resp;
           } else {
@@ -1511,15 +1519,7 @@ public class PipeConsensusReceiver {
               // pipeTaskStartTimes or rebootTimes came in and refreshed the 
requestBuffer. In that
               // cases we need to discard these requests.
               if (!reqExecutionOrderBuffer.contains(requestMeta)) {
-                final TSStatus status =
-                    new TSStatus(
-                        RpcUtils.getStatus(
-                            TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST,
-                            "PipeConsensus receiver received a deprecated 
request, which may be sent before the connector restart or pipe task restart. 
Consider to discard it"));
-                LOGGER.info(
-                    "PipeConsensus-PipeName-{}: received a deprecated request, 
which may be sent before the connector restart or pipe task restart. Consider 
to discard it",
-                    consensusPipeName);
-                return new TPipeConsensusTransferResp(status);
+                return deprecatedResp(String.format("%s or %s", THIS_NODE, 
PIPE_TASK));
               }
               // If the buffer is not full after waiting timeout, we suppose 
that the sender will
               // not send any more events at this time, that is, the sender 
has sent all events. At
@@ -1528,6 +1528,12 @@ public class PipeConsensusReceiver {
                   && reqExecutionOrderBuffer.size() < 
IOTDB_CONFIG.getIotConsensusV2PipelineSize()
                   && reqExecutionOrderBuffer.first() != null
                   && reqExecutionOrderBuffer.first().equals(requestMeta)) {
+                // TODO: Turn it to debug after GA
+                LOGGER.info(
+                    "PipeConsensus-PipeName-{}: no.{} event get executed after 
awaiting timeout, current receiver syncIndex: {}",
+                    consensusPipeName,
+                    tCommitId,
+                    onSyncedReplicateIndex);
                 long startApplyNanos = System.nanoTime();
                 metric.recordDispatchWaitingTimer(startApplyNanos - 
startDispatchNanos);
                 requestMeta.setStartApplyNanos(startApplyNanos);
@@ -1536,8 +1542,6 @@ public class PipeConsensusReceiver {
                 if (resp != null
                     && resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                   onSuccess(tCommitId, isTransferTsFileSeal);
-                  // signal all other reqs that may wait for this event
-                  condition.signalAll();
                 }
                 return resp;
               }
@@ -1557,6 +1561,9 @@ public class PipeConsensusReceiver {
           }
         }
       } finally {
+        // let all threads that may still await become active again to acquire 
lock instead of
+        // meaningless sleeping in the condition while lock is already 
released.
+        condition.signalAll();
         lock.unlock();
       }
     }
@@ -1565,17 +1572,26 @@ public class PipeConsensusReceiver {
      * Reset all data to initial status and set connectorRebootTimes properly. 
This method is called
      * when receiver identifies connector has rebooted.
      */
-    private void resetWithNewestRebootTime(int connectorRebootTimes, Condition 
condition) {
+    private void resetWithNewestRebootTime(int connectorRebootTimes) {
       LOGGER.info(
           "PipeConsensus-PipeName-{}: receiver detected an newer rebootTimes, 
which indicates the leader has rebooted. receiver will reset all its data.",
           consensusPipeName);
       // since pipe task will resend all data that hasn't synchronized after 
dataNode reboots, it's
       // safe to clear all events in buffer.
       clear();
-      // signal all deprecated requests that may wait on condition to expire 
them
-      condition.signalAll();
       // sync the follower's connectorRebootTimes with connector's actual 
rebootTimes.
       this.connectorRebootTimes = connectorRebootTimes;
+      this.pipeTaskRestartTimes = 0;
+    }
+
+    private void resetWithNewestRestartTime(int pipeTaskRestartTimes) {
+      LOGGER.info(
+          "PipeConsensus-PipeName-{}: receiver detected an newer 
pipeTaskRestartTimes, which indicates the pipe task has restarted. receiver 
will reset all its data.",
+          consensusPipeName);
+      // since pipe task will resend all data that hasn't synchronized after 
restarts, it's safe to
+      // clear all events in buffer.
+      clear();
+      this.pipeTaskRestartTimes = pipeTaskRestartTimes;
     }
 
     private void clear() {
@@ -1583,6 +1599,21 @@ public class PipeConsensusReceiver {
       this.tsFileWriterPool.handleExit(consensusPipeName);
       this.onSyncedReplicateIndex = 0;
     }
+
+    private TPipeConsensusTransferResp deprecatedResp(String msg) {
+      final TSStatus status =
+          new TSStatus(
+              RpcUtils.getStatus(
+                  TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST,
+                  String.format(
+                      "PipeConsensus receiver received a deprecated request, 
which may be sent before %s restarts. Consider to discard it",
+                      msg)));
+      LOGGER.info(
+          "PipeConsensus-PipeName-{}: received a deprecated request, which may 
be sent before {} restarts. Consider to discard it",
+          consensusPipeName,
+          msg);
+      return new TPipeConsensusTransferResp(status);
+    }
   }
 
   private static class RequestMeta {
diff --git 
a/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift 
b/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift
index 9bae689e692..56ef64a606b 100644
--- a/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift
+++ b/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift
@@ -22,7 +22,8 @@ namespace java org.apache.iotdb.consensus.pipe.thrift
 
 struct TCommitId {
   1:required i64 replicateIndex
-  2:required i32 dataNodeRebootTimes
+  2:required i32 pipeTaskRestartTimes
+  3:required i32 dataNodeRebootTimes
 }
 
 struct TPipeConsensusTransferReq {

Reply via email to