This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 36c7e09118c Pipe: Fix leader change is not handled correctly in auto 
drop determination (#12617)
36c7e09118c is described below

commit 36c7e09118ca41e124d4b1b2b6aa6c44aa8911de
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 30 16:20:39 2024 +0800

    Pipe: Fix leader change is not handled correctly in auto drop determination 
(#12617)
---
 .../org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java    | 2 ++
 .../org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java    | 4 ++++
 2 files changed, 6 insertions(+)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 037b3b4fb95..5d3a4cc2245 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -508,6 +508,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
                               consensusGroupIdToTaskMetaMap
                                   .get(consensusGroupId.getId())
                                   .setLeaderNodeId(newLeader);
+                              // New region leader may contain un-transferred 
events
+                              
pipeMeta.getTemporaryMeta().markDataNodeUncompleted(newLeader);
                             } else {
                               
consensusGroupIdToTaskMetaMap.remove(consensusGroupId.getId());
                             }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
index 14ef390e12b..7a028f4b0e2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
@@ -34,6 +34,10 @@ public class PipeTemporaryMeta {
     completedDataNodeIds.put(dataNodeId, dataNodeId);
   }
 
+  public void markDataNodeUncompleted(final int dataNodeId) {
+    completedDataNodeIds.remove(dataNodeId);
+  }
+
   public void setRemainingEvent(final int dataNodeId, final long 
remainingEventCount) {
     nodeId2RemainingEventMap.put(dataNodeId, remainingEventCount);
   }

Reply via email to