This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bb51c67c71 IoTConsensusV2: Fix reqBuffer reset and receive concurrent 
bug (#14215)
0bb51c67c71 is described below

commit 0bb51c67c71b84e80f34b8232d8b5f1b2bbe229a
Author: Peng Junzhi <[email protected]>
AuthorDate: Thu Nov 28 10:02:24 2024 +0800

    IoTConsensusV2: Fix reqBuffer reset and receive concurrent bug (#14215)
    
    * fix reqBuffer reset and receive concurrent bug
    
    * fix review
---
 .../pipeconsensus/PipeConsensusReceiver.java       | 27 ++++++++++++++++++----
 1 file changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 20164ff2293..32ead7587e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -1391,11 +1391,11 @@ public class PipeConsensusReceiver {
         // Judge whether connector has rebooted or not, if the rebootTimes 
increases compared to
         // connectorRebootTimes, need to reset receiver because connector has 
been restarted.
         if (tCommitId.getDataNodeRebootTimes() > connectorRebootTimes) {
-          resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes());
+          resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes(), 
condition);
         }
         // Similarly, check pipeTask restartTimes
         if (tCommitId.getPipeTaskRestartTimes() > pipeTaskRestartTimes) {
-          resetWithNewestRestartTime(tCommitId.getPipeTaskRestartTimes());
+          resetWithNewestRestartTime(tCommitId.getPipeTaskRestartTimes(), 
condition);
         }
         // update metric
         if (isTransferTsFilePiece && 
!reqExecutionOrderBuffer.contains(requestMeta)) {
@@ -1471,6 +1471,21 @@ public class PipeConsensusReceiver {
                   !condition.await(
                       PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS, 
TimeUnit.MILLISECONDS);
 
+              // If some reqs find the buffer no longer contains their 
requestMeta after jumping out
+              // from condition.await, it may indicate that during their wait, 
some reqs with newer
+              // pipeTaskStartTimes or rebootTimes came in and refreshed the 
requestBuffer. In that
+              // cases we need to discard these requests.
+              if (!reqExecutionOrderBuffer.contains(requestMeta)) {
+                final TSStatus status =
+                    new TSStatus(
+                        RpcUtils.getStatus(
+                            TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST,
+                            "PipeConsensus receiver received a deprecated 
request, which may be sent before the connector restart or pipe task restart. 
Consider to discard it"));
+                LOGGER.info(
+                    "PipeConsensus-PipeName-{}: received a deprecated request, 
which may be sent before the connector restart or pipe task restart. Consider 
to discard it",
+                    consensusPipeName);
+                return new TPipeConsensusTransferResp(status);
+              }
               // If the buffer is not full after waiting timeout, we suppose 
that the sender will
               // not send any more events at this time, that is, the sender 
has sent all events. At
               // this point we apply the event at reqBuffer's peek
@@ -1515,26 +1530,30 @@ public class PipeConsensusReceiver {
      * Reset all data to initial status and set connectorRebootTimes properly. 
This method is called
      * when receiver identifies connector has rebooted.
      */
-    private void resetWithNewestRebootTime(int connectorRebootTimes) {
+    private void resetWithNewestRebootTime(int connectorRebootTimes, Condition 
condition) {
       LOGGER.info(
           "PipeConsensus-PipeName-{}: receiver detected an newer rebootTimes, 
which indicates the leader has rebooted. receiver will reset all its data.",
           consensusPipeName);
       // since pipe task will resend all data that hasn't synchronized after 
dataNode reboots, it's
       // safe to clear all events in buffer.
       clear();
+      // signal all deprecated requests that may wait on condition to expire 
them
+      condition.signalAll();
       // sync the follower's connectorRebootTimes with connector's actual 
rebootTimes.
       this.connectorRebootTimes = connectorRebootTimes;
       // Note: dataNode rebooting will reset pipeTaskRestartTimes.
       this.pipeTaskRestartTimes = 0;
     }
 
-    private void resetWithNewestRestartTime(int pipeTaskRestartTimes) {
+    private void resetWithNewestRestartTime(int pipeTaskRestartTimes, 
Condition condition) {
       LOGGER.info(
           "PipeConsensus-PipeName-{}: receiver detected an newer 
pipeTaskRestartTimes, which indicates the pipe task has restarted. receiver 
will reset all its data.",
           consensusPipeName);
       // since pipe task will resend all data that hasn't synchronized after 
restarts, it's safe to
       // clear all events in buffer.
       clear();
+      // signal all deprecated requests that may wait on condition to expire 
them
+      condition.signalAll();
       this.pipeTaskRestartTimes = pipeTaskRestartTimes;
     }
 

Reply via email to