This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 36c7e09118c Pipe: Fix leader change is not handled correctly in auto
drop determination (#12617)
36c7e09118c is described below
commit 36c7e09118ca41e124d4b1b2b6aa6c44aa8911de
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 30 16:20:39 2024 +0800
Pipe: Fix leader change is not handled correctly in auto drop determination
(#12617)
---
.../org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java | 2 ++
.../org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java | 4 ++++
2 files changed, 6 insertions(+)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 037b3b4fb95..5d3a4cc2245 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -508,6 +508,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
consensusGroupIdToTaskMetaMap
.get(consensusGroupId.getId())
.setLeaderNodeId(newLeader);
+ // New region leader may contain un-transferred
events
+
pipeMeta.getTemporaryMeta().markDataNodeUncompleted(newLeader);
} else {
consensusGroupIdToTaskMetaMap.remove(consensusGroupId.getId());
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
index 14ef390e12b..7a028f4b0e2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
@@ -34,6 +34,10 @@ public class PipeTemporaryMeta {
completedDataNodeIds.put(dataNodeId, dataNodeId);
}
+ public void markDataNodeUncompleted(final int dataNodeId) {
+ completedDataNodeIds.remove(dataNodeId);
+ }
+
public void setRemainingEvent(final int dataNodeId, final long
remainingEventCount) {
nodeId2RemainingEventMap.put(dataNodeId, remainingEventCount);
}