This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c9ec0fda2d1 Subscription: distinguish between reference count of ack
and clean in tsfile batch to avoid cleaning before ack (#15229)
c9ec0fda2d1 is described below
commit c9ec0fda2d183192a428088c69aff3cdcbbe1acb
Author: VGalaxies <[email protected]>
AuthorDate: Mon Mar 31 10:51:06 2025 +0800
Subscription: distinguish between reference count of ack and clean in
tsfile batch to avoid cleaning before ack (#15229)
---
.../event/batch/SubscriptionPipeTsFileEventBatch.java | 5 +++--
.../event/pipe/SubscriptionPipeTsFileBatchEvents.java | 16 ++++++++++------
2 files changed, 13 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index a1d3e27f6c7..1409de02bf3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -96,13 +96,14 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
final List<SubscriptionEvent> events = new ArrayList<>();
final List<Pair<String, File>> dbTsFilePairs = batch.sealTsFiles();
- final AtomicInteger referenceCount = new
AtomicInteger(dbTsFilePairs.size());
+ final AtomicInteger ackReferenceCount = new
AtomicInteger(dbTsFilePairs.size());
+ final AtomicInteger cleanReferenceCount = new
AtomicInteger(dbTsFilePairs.size());
for (final Pair<String, File> tsFile : dbTsFilePairs) {
final SubscriptionCommitContext commitContext =
prefetchingQueue.generateSubscriptionCommitContext();
events.add(
new SubscriptionEvent(
- new SubscriptionPipeTsFileBatchEvents(this, referenceCount),
+ new SubscriptionPipeTsFileBatchEvents(this, ackReferenceCount,
cleanReferenceCount),
tsFile.right,
commitContext));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
index 8cf31408ec0..651763ab652 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
@@ -28,26 +28,30 @@ import static
com.google.common.base.MoreObjects.toStringHelper;
public class SubscriptionPipeTsFileBatchEvents implements
SubscriptionPipeEvents {
private final SubscriptionPipeTsFileEventBatch batch;
- private final AtomicInteger referenceCount; // shared between the same batch
+ private final AtomicInteger ackReferenceCount; // shared between the same
batch
+ private final AtomicInteger cleanReferenceCount; // shared between the same
batch
private final int count; // snapshot the initial reference count, used for
event count calculation
public SubscriptionPipeTsFileBatchEvents(
- final SubscriptionPipeTsFileEventBatch batch, final AtomicInteger
referenceCount) {
+ final SubscriptionPipeTsFileEventBatch batch,
+ final AtomicInteger ackReferenceCount,
+ final AtomicInteger cleanReferenceCount) {
this.batch = batch;
- this.referenceCount = referenceCount;
- this.count = Math.max(1, referenceCount.get());
+ this.ackReferenceCount = ackReferenceCount;
+ this.cleanReferenceCount = cleanReferenceCount;
+ this.count = Math.max(1, ackReferenceCount.get());
}
@Override
public void ack() {
- if (referenceCount.decrementAndGet() == 0) {
+ if (ackReferenceCount.decrementAndGet() == 0) {
batch.ack();
}
}
@Override
public void cleanUp(final boolean force) {
- if (referenceCount.decrementAndGet() == 0) {
+ if (cleanReferenceCount.decrementAndGet() == 0) {
batch.cleanUp(force);
}
}