This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c20a1d4f91 fix: pick deletion event for historical resend (#17329)
8c20a1d4f91 is described below

commit 8c20a1d4f91f1c33606b701fd4ade406650487d8
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Mar 20 05:55:32 2026 -0500

    fix: pick deletion event for historical resend (#17329)
---
 .../protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java     | 10 +++++++++-
 .../PipeHistoricalDataRegionTsFileAndDeletionSource.java     | 12 ++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
index 3001c40b16f..7cdeef9e826 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
@@ -236,7 +236,15 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink 
implements ConsensusPipeS
     while (!current.equalsInIoTConsensusV2(event) && iterator.hasNext()) {
       current = iterator.next();
     }
-    iterator.remove();
+    if (current.equalsInIoTConsensusV2(event)) {
+      iterator.remove();
+    } else {
+      LOGGER.warn(
+          "IoTConsensusV2-ConsensusGroup-{}: event-{} not found in 
transferBuffer, skip removing. queue size = {}",
+          consensusGroupId,
+          event,
+          transferBuffer.size());
+    }
     // update replicate progress
     currentReplicateProgress =
         Math.max(currentReplicateProgress, event.getReplicateIndexForIoTV2());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 809b40f6eba..32fcece4115 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -930,6 +930,18 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
             skipIfNoPrivileges,
             false);
 
+    // if using IoTV2, assign a replicateIndex for this historical deletion 
event
+    if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
+        && IoTConsensusV2Processor.isShouldReplicate(event)) {
+      event.setReplicateIndexForIoTV2(
+          
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName));
+      LOGGER.debug(
+          "[{}]Set {} for historical deletion event {}",
+          pipeName,
+          event.getReplicateIndexForIoTV2(),
+          event);
+    }
+
     if (sloppyPattern || isDbNameCoveredByPattern) {
       event.skipParsingPattern();
     }

Reply via email to