This is an automated email from the ASF dual-hosted git repository.

VGalaxies pushed a commit to branch fix/export-tsfile-drain-before-termination
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c3c92b39dceb8d932d5f09c399ed43ce16e666cc
Author: VGalaxies <[email protected]>
AuthorDate: Fri Jun 12 04:31:15 2026 +0000

    Subscription: drain tsfile batches before termination
---
 .../broker/SubscriptionPrefetchingQueue.java       | 68 +++++++++++++++-------
 .../event/batch/SubscriptionPipeEventBatch.java    | 21 ++++---
 .../event/batch/SubscriptionPipeEventBatches.java  | 29 +++++++++
 3 files changed, 91 insertions(+), 27 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 086fde0fbcc..ae54a8b8afd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -117,6 +117,7 @@ public abstract class SubscriptionPrefetchingQueue {
   private volatile TsFileInsertionEvent currentTsFileInsertionEvent;
   private volatile RetryableEvent<TabletInsertionEvent> 
currentTabletInsertionEvent;
   private volatile SubscriptionTsFileToTabletIterator currentToTabletIterator;
+  private volatile PipeTerminateEvent currentTerminateEvent;
 
   public SubscriptionPrefetchingQueue(
       final String brokerId,
@@ -175,6 +176,10 @@ public abstract class SubscriptionPrefetchingQueue {
           .clearReferenceCount(this.getClass().getName());
       currentTabletInsertionEvent = null;
     }
+    if (Objects.nonNull(currentTerminateEvent)) {
+      currentTerminateEvent.clearReferenceCount(this.getClass().getName());
+      currentTerminateEvent = null;
+    }
   }
 
   /////////////////////////////////  lock  /////////////////////////////////
@@ -467,6 +472,10 @@ public abstract class SubscriptionPrefetchingQueue {
    * {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty.
    */
   private synchronized void tryPrefetch() {
+    if (Objects.nonNull(currentTerminateEvent) && 
!tryCommitCurrentTerminateEvent()) {
+      return;
+    }
+
     while (!inputPendingQueue.isEmpty() || 
Objects.nonNull(currentTabletInsertionEvent)) {
       if (Objects.nonNull(currentTabletInsertionEvent)) {
         final RetryableState state = 
onRetryableTabletInsertionEvent(currentTabletInsertionEvent);
@@ -497,16 +506,10 @@ public abstract class SubscriptionPrefetchingQueue {
       }
 
       if (event instanceof PipeTerminateEvent) {
-        final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
-        // add mark completed hook
-        terminateEvent.addOnCommittedHook(this::markCompleted);
-        // commit directly
-        ((PipeTerminateEvent) event)
-            
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
-        LOGGER.info(
-            "Subscription: SubscriptionPrefetchingQueue {} commit 
PipeTerminateEvent {}",
-            this,
-            terminateEvent);
+        currentTerminateEvent = (PipeTerminateEvent) event;
+        if (!tryCommitCurrentTerminateEvent()) {
+          return;
+        }
         continue;
       }
 
@@ -549,6 +552,11 @@ public abstract class SubscriptionPrefetchingQueue {
   }
 
   private synchronized void tryPrefetchV2() {
+    if (Objects.nonNull(currentTerminateEvent)) {
+      tryCommitCurrentTerminateEvent();
+      return;
+    }
+
     if (!prefetchingQueue.isEmpty()) {
       return;
     }
@@ -613,16 +621,8 @@ public abstract class SubscriptionPrefetchingQueue {
     }
 
     if (event instanceof PipeTerminateEvent) {
-      final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
-      // add mark completed hook
-      terminateEvent.addOnCommittedHook(this::markCompleted);
-      // commit directly
-      ((PipeTerminateEvent) event)
-          
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
-      LOGGER.info(
-          "Subscription: SubscriptionPrefetchingQueue {} commit 
PipeTerminateEvent {}",
-          this,
-          terminateEvent);
+      currentTerminateEvent = (PipeTerminateEvent) event;
+      tryCommitCurrentTerminateEvent();
       return;
     }
 
@@ -731,6 +731,34 @@ public abstract class SubscriptionPrefetchingQueue {
     return batches.onEvent(this::prefetchEvent);
   }
 
+  private boolean tryCommitCurrentTerminateEvent() {
+    try {
+      batches.emitAll(this::prefetchEvent);
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Subscription: SubscriptionPrefetchingQueue {} failed to emit 
remaining events before committing PipeTerminateEvent {}.",
+          this,
+          currentTerminateEvent,
+          e);
+      return false;
+    }
+
+    if (!prefetchingQueue.isEmpty() || !inFlightEvents.isEmpty()) {
+      return false;
+    }
+
+    // Add mark completed hook only when all subscription events have been 
consumed.
+    currentTerminateEvent.addOnCommittedHook(this::markCompleted);
+    currentTerminateEvent.decreaseReferenceCount(
+        SubscriptionPrefetchingQueue.class.getName(), true);
+    LOGGER.info(
+        "Subscription: SubscriptionPrefetchingQueue {} commit 
PipeTerminateEvent {}",
+        this,
+        currentTerminateEvent);
+    currentTerminateEvent = null;
+    return true;
+  }
+
   /////////////////////////////// commit ///////////////////////////////
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
index d25573add6b..9dca114c018 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
@@ -71,15 +71,22 @@ public abstract class SubscriptionPipeEventBatch {
   protected synchronized boolean onEvent(final Consumer<SubscriptionEvent> 
consumer)
       throws Exception {
     if (shouldEmit() && !enrichedEvents.isEmpty()) {
-      if (Objects.isNull(events)) {
-        events = generateSubscriptionEvents();
-      }
-      if (Objects.nonNull(events)) {
-        events.forEach(consumer);
-        return true;
-      }
+      return emit(consumer);
+    }
+    return false;
+  }
+
+  protected synchronized boolean emit(final Consumer<SubscriptionEvent> 
consumer) throws Exception {
+    if (enrichedEvents.isEmpty()) {
       return false;
     }
+    if (Objects.isNull(events)) {
+      events = generateSubscriptionEvents();
+    }
+    if (Objects.nonNull(events)) {
+      events.forEach(consumer);
+      return true;
+    }
     return false;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
index 467f788f797..381a8bf9ee2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
@@ -90,6 +90,35 @@ public class SubscriptionPipeEventBatches {
     return hasNew.get();
   }
 
+  public boolean emitAll(final Consumer<SubscriptionEvent> consumer) throws 
Exception {
+    final AtomicBoolean hasNew = new AtomicBoolean(false);
+    Exception exception = null;
+    for (final int regionId : ImmutableList.copyOf(regionIdToBatch.keySet())) {
+      try {
+        segmentLock.lock(regionId);
+        final SubscriptionPipeEventBatch batch = regionIdToBatch.get(regionId);
+        if (Objects.isNull(batch)) {
+          continue;
+        }
+        if (batch.emit(consumer)) {
+          hasNew.set(true);
+          regionIdToBatch.remove(regionId);
+        }
+      } catch (final Exception e) {
+        LOGGER.warn(
+            DataNodeMiscMessages.EXCEPTION_SEALING_EVENTS, 
regionIdToBatch.get(regionId), e);
+        exception = e;
+      } finally {
+        segmentLock.unlock(regionId);
+      }
+    }
+
+    if (Objects.nonNull(exception)) {
+      throw exception;
+    }
+    return hasNew.get();
+  }
+
   /**
    * @return {@code true} if there are subscription events consumed.
    */

Reply via email to