This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ca6f5f4c57a Subscription: retain tsfile events in tsfile batch to 
avoid premature commit (#15598)
ca6f5f4c57a is described below

commit ca6f5f4c57aff59d553546ffab904c423d5b23cb
Author: VGalaxies <[email protected]>
AuthorDate: Thu May 29 10:46:19 2025 +0800

    Subscription: retain tsfile events in tsfile batch to avoid premature 
commit (#15598)
    
    Consider the historical data export snapshot scenario:
    
    1. The events delivered upstream are, in order, the tsfile event and the 
termination event.
    2. The tsfile event is parsed into multiple tablet events, and then the 
reference count of the tsfile event is set to 0 (should report as false).
    3. Assuming that for some reason the tablet events were not sent to the 
peer in time, the reference count of the transfer termination event is set to 0 
(should report as true).
    4. At this point, because the tablet events were not enriched with a commit 
id (see Subscription: fully managed tsfile parsing process for tsfile format 
topicĀ #15524), the termination event successfully marks the corresponding DR 
complete, which in turn leads to data loss.
---
 .../event/batch/SubscriptionPipeTsFileEventBatch.java         | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 572a785c41b..e6328a39ef4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -58,12 +58,22 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
   @Override
   public synchronized void ack() {
     batch.decreaseEventsReferenceCount(this.getClass().getName(), true);
+    enrichedEvents.stream()
+        // only decrease reference count for tsfile event, since we already 
decrease reference count
+        // for tablet event in batch
+        .filter(event -> event instanceof PipeTsFileInsertionEvent)
+        .forEach(event -> 
event.decreaseReferenceCount(this.getClass().getName(), true));
   }
 
   @Override
   public synchronized void cleanUp(final boolean force) {
     // close batch, it includes clearing the reference count of events
     batch.close();
+
+    // clear the reference count of events
+    for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+      enrichedEvent.clearReferenceCount(this.getClass().getName());
+    }
     enrichedEvents.clear();
   }
 
@@ -103,7 +113,6 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
     } finally {
       try {
         event.close();
-        ((PipeTsFileInsertionEvent) 
event).decreaseReferenceCount(this.getClass().getName(), false);
       } catch (final Exception ignored) {
         // no exceptions will be thrown
       }

Reply via email to