This is an automated email from the ASF dual-hosted git repository. VGalaxies pushed a commit to branch fix/export-tsfile-drain-before-termination in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c3c92b39dceb8d932d5f09c399ed43ce16e666cc Author: VGalaxies <[email protected]> AuthorDate: Fri Jun 12 04:31:15 2026 +0000 Subscription: drain tsfile batches before termination --- .../broker/SubscriptionPrefetchingQueue.java | 68 +++++++++++++++------- .../event/batch/SubscriptionPipeEventBatch.java | 21 ++++--- .../event/batch/SubscriptionPipeEventBatches.java | 29 +++++++++ 3 files changed, 91 insertions(+), 27 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index 086fde0fbcc..ae54a8b8afd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -117,6 +117,7 @@ public abstract class SubscriptionPrefetchingQueue { private volatile TsFileInsertionEvent currentTsFileInsertionEvent; private volatile RetryableEvent<TabletInsertionEvent> currentTabletInsertionEvent; private volatile SubscriptionTsFileToTabletIterator currentToTabletIterator; + private volatile PipeTerminateEvent currentTerminateEvent; public SubscriptionPrefetchingQueue( final String brokerId, @@ -175,6 +176,10 @@ public abstract class SubscriptionPrefetchingQueue { .clearReferenceCount(this.getClass().getName()); currentTabletInsertionEvent = null; } + if (Objects.nonNull(currentTerminateEvent)) { + currentTerminateEvent.clearReferenceCount(this.getClass().getName()); + currentTerminateEvent = null; + } } ///////////////////////////////// lock ///////////////////////////////// @@ -467,6 +472,10 @@ public abstract class SubscriptionPrefetchingQueue { * {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty. */ private synchronized void tryPrefetch() { + if (Objects.nonNull(currentTerminateEvent) && !tryCommitCurrentTerminateEvent()) { + return; + } + while (!inputPendingQueue.isEmpty() || Objects.nonNull(currentTabletInsertionEvent)) { if (Objects.nonNull(currentTabletInsertionEvent)) { final RetryableState state = onRetryableTabletInsertionEvent(currentTabletInsertionEvent); @@ -497,16 +506,10 @@ public abstract class SubscriptionPrefetchingQueue { } if (event instanceof PipeTerminateEvent) { - final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event; - // add mark completed hook - terminateEvent.addOnCommittedHook(this::markCompleted); - // commit directly - ((PipeTerminateEvent) event) - .decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true); - LOGGER.info( - "Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent {}", - this, - terminateEvent); + currentTerminateEvent = (PipeTerminateEvent) event; + if (!tryCommitCurrentTerminateEvent()) { + return; + } continue; } @@ -549,6 +552,11 @@ public abstract class SubscriptionPrefetchingQueue { } private synchronized void tryPrefetchV2() { + if (Objects.nonNull(currentTerminateEvent)) { + tryCommitCurrentTerminateEvent(); + return; + } + if (!prefetchingQueue.isEmpty()) { return; } @@ -613,16 +621,8 @@ public abstract class SubscriptionPrefetchingQueue { } if (event instanceof PipeTerminateEvent) { - final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event; - // add mark completed hook - terminateEvent.addOnCommittedHook(this::markCompleted); - // commit directly - ((PipeTerminateEvent) event) - .decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true); - LOGGER.info( - "Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent {}", - this, - terminateEvent); + currentTerminateEvent = (PipeTerminateEvent) event; + tryCommitCurrentTerminateEvent(); return; } @@ -731,6 +731,34 @@ public abstract class SubscriptionPrefetchingQueue { return batches.onEvent(this::prefetchEvent); } + private boolean tryCommitCurrentTerminateEvent() { + try { + batches.emitAll(this::prefetchEvent); + } catch (final Exception e) { + LOGGER.warn( + "Subscription: SubscriptionPrefetchingQueue {} failed to emit remaining events before committing PipeTerminateEvent {}.", + this, + currentTerminateEvent, + e); + return false; + } + + if (!prefetchingQueue.isEmpty() || !inFlightEvents.isEmpty()) { + return false; + } + + // Add mark completed hook only when all subscription events have been consumed. + currentTerminateEvent.addOnCommittedHook(this::markCompleted); + currentTerminateEvent.decreaseReferenceCount( + SubscriptionPrefetchingQueue.class.getName(), true); + LOGGER.info( + "Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent {}", + this, + currentTerminateEvent); + currentTerminateEvent = null; + return true; + } + /////////////////////////////// commit /////////////////////////////// /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java index d25573add6b..9dca114c018 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java @@ -71,15 +71,22 @@ public abstract class SubscriptionPipeEventBatch { protected synchronized boolean onEvent(final Consumer<SubscriptionEvent> consumer) throws Exception { if (shouldEmit() && !enrichedEvents.isEmpty()) { - if (Objects.isNull(events)) { - events = generateSubscriptionEvents(); - } - if (Objects.nonNull(events)) { - events.forEach(consumer); - return true; - } + return emit(consumer); + } + return false; + } + + protected synchronized boolean emit(final Consumer<SubscriptionEvent> consumer) throws Exception { + if (enrichedEvents.isEmpty()) { return false; } + if (Objects.isNull(events)) { + events = generateSubscriptionEvents(); + } + if (Objects.nonNull(events)) { + events.forEach(consumer); + return true; + } return false; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java index 467f788f797..381a8bf9ee2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java @@ -90,6 +90,35 @@ public class SubscriptionPipeEventBatches { return hasNew.get(); } + public boolean emitAll(final Consumer<SubscriptionEvent> consumer) throws Exception { + final AtomicBoolean hasNew = new AtomicBoolean(false); + Exception exception = null; + for (final int regionId : ImmutableList.copyOf(regionIdToBatch.keySet())) { + try { + segmentLock.lock(regionId); + final SubscriptionPipeEventBatch batch = regionIdToBatch.get(regionId); + if (Objects.isNull(batch)) { + continue; + } + if (batch.emit(consumer)) { + hasNew.set(true); + regionIdToBatch.remove(regionId); + } + } catch (final Exception e) { + LOGGER.warn( + DataNodeMiscMessages.EXCEPTION_SEALING_EVENTS, regionIdToBatch.get(regionId), e); + exception = e; + } finally { + segmentLock.unlock(regionId); + } + } + + if (Objects.nonNull(exception)) { + throw exception; + } + return hasNew.get(); + } + /** * @return {@code true} if there are subscription events consumed. */
