This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 97657f9a554 Subscription: add parameter to the clean up method to 
distinguish the force close situation & fix reference count management for 
iteration snapshot (#14966) (#14984)
97657f9a554 is described below

commit 97657f9a5546933e1e34f6dad1868bcc96ac07d0
Author: VGalaxies <[email protected]>
AuthorDate: Fri Feb 28 13:22:37 2025 +0800

    Subscription: add parameter to the clean up method to distinguish the force 
close situation & fix reference count management for iteration snapshot 
(#14966) (#14984)
---
 .../broker/SubscriptionPrefetchingQueue.java           | 10 +++++-----
 .../broker/SubscriptionPrefetchingTabletQueue.java     |  2 +-
 .../broker/SubscriptionPrefetchingTsFileQueue.java     |  2 +-
 .../iotdb/db/subscription/event/SubscriptionEvent.java |  4 ++--
 .../event/batch/SubscriptionPipeEventBatch.java        |  2 +-
 .../event/batch/SubscriptionPipeEventBatches.java      |  2 +-
 .../event/batch/SubscriptionPipeTabletEventBatch.java  |  6 +++---
 .../batch/SubscriptionPipeTabletIterationSnapshot.java | 18 +++++++++++++++++-
 .../event/batch/SubscriptionPipeTsFileEventBatch.java  |  2 +-
 .../event/pipe/SubscriptionPipeEmptyEvent.java         |  2 +-
 .../event/pipe/SubscriptionPipeEvents.java             |  2 +-
 .../event/pipe/SubscriptionPipeTabletBatchEvents.java  | 11 ++++++++---
 .../event/pipe/SubscriptionPipeTsFileBatchEvents.java  |  4 ++--
 .../event/pipe/SubscriptionPipeTsFilePlainEvent.java   |  2 +-
 14 files changed, 45 insertions(+), 24 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index f0d6d752398..0514ac0075e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -136,11 +136,11 @@ public abstract class SubscriptionPrefetchingQueue {
     batches.cleanUp();
 
     // clean up events in prefetchingQueue
-    prefetchingQueue.forEach(SubscriptionEvent::cleanUp);
+    prefetchingQueue.forEach(event -> event.cleanUp(true));
     prefetchingQueue.clear();
 
     // clean up events in inFlightEvents
-    inFlightEvents.values().forEach(SubscriptionEvent::cleanUp);
+    inFlightEvents.values().forEach(event -> event.cleanUp(true));
     inFlightEvents.clear();
 
     // no need to clean up events in inputPendingQueue, see
@@ -447,7 +447,7 @@ public abstract class SubscriptionPrefetchingQueue {
                 commitContext,
                 this);
             // clean up committed event
-            ev.cleanUp();
+            ev.cleanUp(false);
             return null; // remove this entry
           }
 
@@ -477,7 +477,7 @@ public abstract class SubscriptionPrefetchingQueue {
           acked.set(true);
 
           // clean up committed event
-          ev.cleanUp();
+          ev.cleanUp(false);
           return null; // remove this entry
         });
 
@@ -668,7 +668,7 @@ public abstract class SubscriptionPrefetchingQueue {
   private final RemappingFunction<SubscriptionEvent> committedCleaner =
       (ev) -> {
         if (ev.isCommitted()) {
-          ev.cleanUp();
+          ev.cleanUp(false);
           return null; // remove this entry
         }
         return ev;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 79374755f97..4b7a166b922 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -88,7 +88,7 @@ public class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQ
           }
 
           if (ev.isCommitted()) {
-            ev.cleanUp();
+            ev.cleanUp(false);
             final String errorMessage =
                 String.format(
                     "outdated poll request after commit, consumer id: %s, 
commit context: %s, offset: %s, prefetching queue: %s",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index f2961afd7f5..98156972353 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -94,7 +94,7 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
           }
 
           if (ev.isCommitted()) {
-            ev.cleanUp();
+            ev.cleanUp(false);
             final String errorMessage =
                 String.format(
                     "outdated poll request after commit, consumer id: %s, 
commit context: %s, writing offset: %s, prefetching queue: %s",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index acf47ca8a51..90bc9c603d5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -181,8 +181,8 @@ public class SubscriptionEvent implements 
Comparable<SubscriptionEvent> {
    * {@link ConcurrentHashMap#compute} method of inFlightEvents in {@link
    * SubscriptionPrefetchingQueue} or {@link 
SubscriptionPrefetchingQueue#cleanUp}.
    */
-  public void cleanUp() {
-    pipeEvents.cleanUp();
+  public void cleanUp(final boolean force) {
+    pipeEvents.cleanUp(force);
     response.cleanUp();
 
     // TODO: clean more fields
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
index 5f2b0e069a6..fe1b280f16c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
@@ -62,7 +62,7 @@ public abstract class SubscriptionPipeEventBatch {
 
   public abstract void ack();
 
-  public abstract void cleanUp();
+  public abstract void cleanUp(final boolean force);
 
   /////////////////////////////// APIs ///////////////////////////////
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
index 36a4e9a1c8a..c20dffcd5e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
@@ -136,7 +136,7 @@ public class SubscriptionPipeEventBatches {
   }
 
   public void cleanUp() {
-    regionIdToBatch.values().forEach(SubscriptionPipeEventBatch::cleanUp);
+    regionIdToBatch.values().forEach(batch -> batch.cleanUp(true));
     regionIdToBatch.clear();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index a5628aca8c6..d2cbf72be0e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -101,9 +101,9 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
   }
 
   @Override
-  public synchronized void cleanUp() {
+  public synchronized void cleanUp(final boolean force) {
     // do nothing if it has next or still referenced by unacked response
-    if (hasNext() || referenceCount.get() != 0) {
+    if (!force && (hasNext() || referenceCount.get() != 0)) {
       return;
     }
 
@@ -226,7 +226,7 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
     currentTsFileInsertionEvent = null;
 
     if (Objects.nonNull(iterationSnapshot)) {
-      iterationSnapshot.clear();
+      iterationSnapshot.cleanUp();
     }
     iterationSnapshot = new SubscriptionPipeTabletIterationSnapshot();
     referenceCount.set(0);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
index 137354ae36b..a7466e9070f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
@@ -44,7 +44,7 @@ public class SubscriptionPipeTabletIterationSnapshot {
     parsedEnrichedEvents.add(enrichedEvent);
   }
 
-  public void clear() {
+  public void ack() {
     for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
       if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
         // close data container in tsfile event
@@ -59,4 +59,20 @@ public class SubscriptionPipeTabletIterationSnapshot {
       }
     }
   }
+
+  public void cleanUp() {
+    for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
+      if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
+        // close data container in tsfile event
+        ((PipeTsFileInsertionEvent) enrichedEvent).close();
+      }
+    }
+
+    for (final EnrichedEvent enrichedEvent : parsedEnrichedEvents) {
+      if (enrichedEvent instanceof PipeRawTabletInsertionEvent) {
+        // clear reference count in raw tablet event
+        enrichedEvent.clearReferenceCount(this.getClass().getName());
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 6148cb1a060..71671d55d95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -58,7 +58,7 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
   }
 
   @Override
-  public synchronized void cleanUp() {
+  public synchronized void cleanUp(final boolean force) {
     // close batch, it includes clearing the reference count of events
     batch.close();
     enrichedEvents.clear();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
index a2b9b3c3db9..df016b3a39c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
@@ -27,7 +27,7 @@ public class SubscriptionPipeEmptyEvent implements 
SubscriptionPipeEvents {
   public void ack() {}
 
   @Override
-  public void cleanUp() {}
+  public void cleanUp(final boolean force) {}
 
   /////////////////////////////// stringify ///////////////////////////////
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
index 05f011699ec..5c7006df7a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
@@ -23,7 +23,7 @@ public interface SubscriptionPipeEvents {
 
   void ack();
 
-  void cleanUp();
+  void cleanUp(final boolean force);
 
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
index 9bf87c6b1be..f96714921e4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
@@ -46,12 +46,17 @@ public class SubscriptionPipeTabletBatchEvents implements 
SubscriptionPipeEvents
   @Override
   public void ack() {
     batch.ack();
-    iterationSnapshot.clear();
+    if (Objects.nonNull(iterationSnapshot)) {
+      iterationSnapshot.ack();
+    }
   }
 
   @Override
-  public void cleanUp() {
-    batch.cleanUp();
+  public void cleanUp(final boolean force) {
+    batch.cleanUp(force);
+    if (Objects.nonNull(iterationSnapshot)) {
+      iterationSnapshot.cleanUp();
+    }
   }
 
   /////////////////////////////// stringify ///////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
index 8b98f1b758f..8cf31408ec0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
@@ -46,9 +46,9 @@ public class SubscriptionPipeTsFileBatchEvents implements 
SubscriptionPipeEvents
   }
 
   @Override
-  public void cleanUp() {
+  public void cleanUp(final boolean force) {
     if (referenceCount.decrementAndGet() == 0) {
-      batch.cleanUp();
+      batch.cleanUp(force);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
index 453e44be965..42dc20eb7bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
@@ -37,7 +37,7 @@ public class SubscriptionPipeTsFilePlainEvent implements 
SubscriptionPipeEvents
   }
 
   @Override
-  public void cleanUp() {
+  public void cleanUp(final boolean force) {
     // close data container in tsfile event
     tsFileInsertionEvent.close();
     // clear the reference count of event

Reply via email to