This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch fix-drop-pipe-historical-source-close
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4787a27bc777e2b4b0d5430990ff67aeff277eea
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 15:37:57 2026 +0800

    Pipe: stop historical source supply after drop
---
 .../iotdb/db/pipe/agent/task/PipeDataNodeTask.java |   2 +-
 ...istoricalDataRegionTsFileAndDeletionSource.java | 104 +++++++++++++++------
 2 files changed, 75 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
index 0d4e50cabef..c5e78685b9d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
@@ -68,9 +68,9 @@ public class PipeDataNodeTask implements PipeTask {
   @Override
   public void drop() {
     final long startTime = System.currentTimeMillis();
-    sourceStage.drop();
     processorStage.drop();
     sinkStage.drop();
+    sourceStage.drop();
     LOGGER.info(
         DataNodePipeMessages.DROP_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS,
         this,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index e71d80a61a6..898bc9464d1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -78,6 +78,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -162,10 +163,12 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
   private boolean isForwardingPipeRequests;
 
   private volatile boolean hasBeenStarted = false;
+  private volatile boolean hasBeenClosed = false;
 
   private Queue<PersistentResource> pendingQueue;
   private final Map<TsFileResource, Set<String>> 
filteredTsFileResources2TableNames =
       new HashMap<>();
+  private final Set<TsFileResource> pinnedTsFileResources = new HashSet<>();
   private final Map<PersistentResource, Long> 
pendingResource2ReplicateIndexForIoTV2 =
       new HashMap<>();
   private int extractedHistoricalTsFileCount = 0;
@@ -480,6 +483,9 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
 
   @Override
   public synchronized void start() {
+    if (hasBeenClosed) {
+      return;
+    }
     if (!shouldExtractInsertion && !shouldExtractDeletion) {
       hasBeenStarted = true;
       return;
@@ -508,12 +514,21 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
 
       if (shouldExtractInsertion) {
         flushTsFilesForExtraction(dataRegion);
+        if (hasBeenClosed) {
+          return;
+        }
         extractTsFiles(dataRegion, startHistoricalExtractionTime, 
originalResourceList);
       }
+      if (hasBeenClosed) {
+        return;
+      }
       if (shouldExtractDeletion) {
         Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId))
             .ifPresent(manager -> extractDeletions(manager, 
originalResourceList));
       }
+      if (hasBeenClosed) {
+        return;
+      }
 
       // Sort tsFileResource and deletionResource
       long startTime = System.currentTimeMillis();
@@ -561,6 +576,9 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
       final DataRegion dataRegion,
       final long startHistoricalExtractionTime,
       final List<PersistentResource> originalResourceList) {
+    if (hasBeenClosed) {
+      return;
+    }
     final TsFileManager tsFileManager = dataRegion.getTsFileManager();
     tsFileManager.readLock();
     try {
@@ -580,7 +598,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
               .peek(originalResourceList::add)
               .filter(
                   resource ->
-                      isHistoricalSourceEnabled
+                      !hasBeenClosed
+                          && isHistoricalSourceEnabled
                           &&
                           // Some resource is marked as deleted but not 
removed from the list.
                           !resource.isDeleted()
@@ -613,7 +632,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
               .peek(originalResourceList::add)
               .filter(
                   resource ->
-                      isHistoricalSourceEnabled
+                      !hasBeenClosed
+                          && isHistoricalSourceEnabled
                           &&
                           // Some resource is marked as deleted but not 
removed from the list.
                           !resource.isDeleted()
@@ -645,11 +665,15 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
           .keySet()
           .removeIf(
               resource -> {
+                if (hasBeenClosed) {
+                  return true;
+                }
                 // Pin the resource, in case the file is removed by compaction 
or anything.
                 // Will unpin it after the PipeTsFileInsertionEvent is created 
and pinned.
                 try {
                   PipeDataNodeResourceManager.tsfile()
                       .pinTsFileResource(resource, shouldTransferModFile, 
pipeName);
+                  pinnedTsFileResources.add(resource);
                   return false;
                 } catch (final IOException e) {
                   LOGGER.warn(
@@ -821,17 +845,21 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
 
   @Override
   public synchronized Event supply() {
+    if (hasBeenClosed) {
+      return null;
+    }
     if (!hasBeenStarted && 
StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
       start();
     }
 
-    if (Objects.isNull(pendingQueue)) {
+    if (hasBeenClosed || Objects.isNull(pendingQueue)) {
       return null;
     }
 
     final PersistentResource resource = pendingQueue.peek();
     if (resource == null) {
-      return supplyTerminateEvent();
+      final Event event = supplyTerminateEvent();
+      return hasBeenClosed ? clearReferenceCountAndReturnNull(event) : event;
     }
 
     if (resource instanceof TsFileResource) {
@@ -839,17 +867,26 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
       if (consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)) {
         clearReplicateIndexForResource(tsFileResource);
         pendingQueue.poll();
-        return supplyProgressReportEvent(tsFileResource.getMaxProgressIndex());
+        final Event event = 
supplyProgressReportEvent(tsFileResource.getMaxProgressIndex());
+        return hasBeenClosed ? clearReferenceCountAndReturnNull(event) : event;
       }
 
       final Event event = supplyTsFileEvent(tsFileResource);
       pendingQueue.poll();
-      return event;
+      return hasBeenClosed ? clearReferenceCountAndReturnNull(event) : event;
     }
 
     final Event event = supplyDeletionEvent((DeletionResource) resource);
     pendingQueue.poll();
-    return event;
+    return hasBeenClosed ? clearReferenceCountAndReturnNull(event) : event;
+  }
+
+  private Event clearReferenceCountAndReturnNull(final Event event) {
+    if (event instanceof EnrichedEvent) {
+      ((EnrichedEvent) event)
+          
.clearReferenceCount(PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName());
+    }
+    return null;
   }
 
   private Event supplyTerminateEvent() {
@@ -909,6 +946,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
             dataRegionId,
             resource.getTsFilePath(),
             e);
+      } finally {
+        pinnedTsFileResources.remove(resource);
       }
     }
   }
@@ -1003,6 +1042,8 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
               pipeName,
               dataRegionId,
               resource.getTsFilePath());
+        } finally {
+          pinnedTsFileResources.remove(resource);
         }
       }
     }
@@ -1086,8 +1127,9 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
   public synchronized boolean hasConsumedAll() {
     // If the pendingQueue is null when the function is called, it implies 
that the extractor only
     // extracts deletion thus the historical event has nothing to consume.
-    return hasBeenStarted
-        && (Objects.isNull(pendingQueue) || pendingQueue.isEmpty() && 
isTerminateSignalSent);
+    return hasBeenClosed
+        || hasBeenStarted
+            && (Objects.isNull(pendingQueue) || pendingQueue.isEmpty() && 
isTerminateSignalSent);
   }
 
   @Override
@@ -1096,30 +1138,32 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
   }
 
   @Override
-  public synchronized void close() {
-    if (!isTerminateSignalSent) {
-      PipeTerminateEvent.clearHistoricalTransferSummary(pipeName, 
creationTime, dataRegionId);
-    }
-    if (Objects.nonNull(pendingQueue)) {
-      pendingQueue.forEach(
+  public void close() {
+    hasBeenClosed = true;
+    synchronized (this) {
+      if (!isTerminateSignalSent) {
+        PipeTerminateEvent.clearHistoricalTransferSummary(pipeName, 
creationTime, dataRegionId);
+      }
+      pinnedTsFileResources.forEach(
           resource -> {
-            if (resource instanceof TsFileResource) {
-              try {
-                PipeDataNodeResourceManager.tsfile()
-                    .unpinTsFileResource(
-                        (TsFileResource) resource, shouldTransferModFile, 
pipeName);
-              } catch (final IOException e) {
-                LOGGER.warn(
-                    
DataNodePipeMessages.PIPE_FAILED_TO_UNPIN_TSFILERESOURCE_AFTER_DROPPING,
-                    pipeName,
-                    dataRegionId,
-                    ((TsFileResource) resource).getTsFilePath());
-              }
+            try {
+              PipeDataNodeResourceManager.tsfile()
+                  .unpinTsFileResource(resource, shouldTransferModFile, 
pipeName);
+            } catch (final IOException e) {
+              LOGGER.warn(
+                  
DataNodePipeMessages.PIPE_FAILED_TO_UNPIN_TSFILERESOURCE_AFTER_DROPPING,
+                  pipeName,
+                  dataRegionId,
+                  resource.getTsFilePath());
             }
           });
-      pendingQueue.clear();
-      pendingQueue = null;
+      pinnedTsFileResources.clear();
+      if (Objects.nonNull(pendingQueue)) {
+        pendingQueue.clear();
+        pendingQueue = null;
+      }
+      filteredTsFileResources2TableNames.clear();
+      pendingResource2ReplicateIndexForIoTV2.clear();
     }
-    pendingResource2ReplicateIndexForIoTV2.clear();
   }
 }

Reply via email to