This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ca6f5f4c57a Subscription: retain tsfile events in tsfile batch to
avoid premature commit (#15598)
ca6f5f4c57a is described below
commit ca6f5f4c57aff59d553546ffab904c423d5b23cb
Author: VGalaxies <[email protected]>
AuthorDate: Thu May 29 10:46:19 2025 +0800
Subscription: retain tsfile events in tsfile batch to avoid premature
commit (#15598)
Consider the historical data export snapshot scenario:
1. The events delivered upstream are, in order, the tsfile event and the
termination event.
2. The tsfile event is parsed into multiple tablet events, and then the
reference count of the tsfile event is set to 0 (should report as false).
3. Assuming that for some reason the tablet events were not sent to the
peer in time, the reference count of the transfer termination event is set to 0
(should report as true).
4. At this point, because the tablet events were not enriched with a commit
id (see Subscription: fully managed tsfile parsing process for tsfile format
topicĀ #15524), the termination event successfully marks the corresponding DR
complete, which in turn leads to data loss.
---
.../event/batch/SubscriptionPipeTsFileEventBatch.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 572a785c41b..e6328a39ef4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -58,12 +58,22 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
@Override
public synchronized void ack() {
batch.decreaseEventsReferenceCount(this.getClass().getName(), true);
+ enrichedEvents.stream()
+ // only decrease reference count for tsfile event, since we already
decrease reference count
+ // for tablet event in batch
+ .filter(event -> event instanceof PipeTsFileInsertionEvent)
+ .forEach(event ->
event.decreaseReferenceCount(this.getClass().getName(), true));
}
@Override
public synchronized void cleanUp(final boolean force) {
// close batch, it includes clearing the reference count of events
batch.close();
+
+ // clear the reference count of events
+ for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+ enrichedEvent.clearReferenceCount(this.getClass().getName());
+ }
enrichedEvents.clear();
}
@@ -103,7 +113,6 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
} finally {
try {
event.close();
- ((PipeTsFileInsertionEvent)
event).decreaseReferenceCount(this.getClass().getName(), false);
} catch (final Exception ignored) {
// no exceptions will be thrown
}