This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8c20a1d4f91 fix: pick deletion event for historical resend (#17329)
8c20a1d4f91 is described below
commit 8c20a1d4f91f1c33606b701fd4ade406650487d8
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Mar 20 05:55:32 2026 -0500
fix: pick deletion event for historical resend (#17329)
---
.../protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java | 10 +++++++++-
.../PipeHistoricalDataRegionTsFileAndDeletionSource.java | 12 ++++++++++++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
index 3001c40b16f..7cdeef9e826 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
@@ -236,7 +236,15 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink
implements ConsensusPipeS
while (!current.equalsInIoTConsensusV2(event) && iterator.hasNext()) {
current = iterator.next();
}
- iterator.remove();
+ if (current.equalsInIoTConsensusV2(event)) {
+ iterator.remove();
+ } else {
+ LOGGER.warn(
+ "IoTConsensusV2-ConsensusGroup-{}: event-{} not found in
transferBuffer, skip removing. queue size = {}",
+ consensusGroupId,
+ event,
+ transferBuffer.size());
+ }
// update replicate progress
currentReplicateProgress =
Math.max(currentReplicateProgress, event.getReplicateIndexForIoTV2());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 809b40f6eba..32fcece4115 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -930,6 +930,18 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
skipIfNoPrivileges,
false);
+ // if using IoTV2, assign a replicateIndex for this historical deletion
event
+ if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
+ && IoTConsensusV2Processor.isShouldReplicate(event)) {
+ event.setReplicateIndexForIoTV2(
+
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName));
+ LOGGER.debug(
+ "[{}]Set {} for historical deletion event {}",
+ pipeName,
+ event.getReplicateIndexForIoTV2(),
+ event);
+ }
+
if (sloppyPattern || isDbNameCoveredByPattern) {
event.skipParsingPattern();
}