This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 97657f9a554 Subscription: add parameter to the clean up method to
distinguish the force close situation & fix reference count management for
iteration snapshot (#14966) (#14984)
97657f9a554 is described below
commit 97657f9a5546933e1e34f6dad1868bcc96ac07d0
Author: VGalaxies <[email protected]>
AuthorDate: Fri Feb 28 13:22:37 2025 +0800
Subscription: add parameter to the clean up method to distinguish the force
close situation & fix reference count management for iteration snapshot
(#14966) (#14984)
---
.../broker/SubscriptionPrefetchingQueue.java | 10 +++++-----
.../broker/SubscriptionPrefetchingTabletQueue.java | 2 +-
.../broker/SubscriptionPrefetchingTsFileQueue.java | 2 +-
.../iotdb/db/subscription/event/SubscriptionEvent.java | 4 ++--
.../event/batch/SubscriptionPipeEventBatch.java | 2 +-
.../event/batch/SubscriptionPipeEventBatches.java | 2 +-
.../event/batch/SubscriptionPipeTabletEventBatch.java | 6 +++---
.../batch/SubscriptionPipeTabletIterationSnapshot.java | 18 +++++++++++++++++-
.../event/batch/SubscriptionPipeTsFileEventBatch.java | 2 +-
.../event/pipe/SubscriptionPipeEmptyEvent.java | 2 +-
.../event/pipe/SubscriptionPipeEvents.java | 2 +-
.../event/pipe/SubscriptionPipeTabletBatchEvents.java | 11 ++++++++---
.../event/pipe/SubscriptionPipeTsFileBatchEvents.java | 4 ++--
.../event/pipe/SubscriptionPipeTsFilePlainEvent.java | 2 +-
14 files changed, 45 insertions(+), 24 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index f0d6d752398..0514ac0075e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -136,11 +136,11 @@ public abstract class SubscriptionPrefetchingQueue {
batches.cleanUp();
// clean up events in prefetchingQueue
- prefetchingQueue.forEach(SubscriptionEvent::cleanUp);
+ prefetchingQueue.forEach(event -> event.cleanUp(true));
prefetchingQueue.clear();
// clean up events in inFlightEvents
- inFlightEvents.values().forEach(SubscriptionEvent::cleanUp);
+ inFlightEvents.values().forEach(event -> event.cleanUp(true));
inFlightEvents.clear();
// no need to clean up events in inputPendingQueue, see
@@ -447,7 +447,7 @@ public abstract class SubscriptionPrefetchingQueue {
commitContext,
this);
// clean up committed event
- ev.cleanUp();
+ ev.cleanUp(false);
return null; // remove this entry
}
@@ -477,7 +477,7 @@ public abstract class SubscriptionPrefetchingQueue {
acked.set(true);
// clean up committed event
- ev.cleanUp();
+ ev.cleanUp(false);
return null; // remove this entry
});
@@ -668,7 +668,7 @@ public abstract class SubscriptionPrefetchingQueue {
private final RemappingFunction<SubscriptionEvent> committedCleaner =
(ev) -> {
if (ev.isCommitted()) {
- ev.cleanUp();
+ ev.cleanUp(false);
return null; // remove this entry
}
return ev;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 79374755f97..4b7a166b922 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -88,7 +88,7 @@ public class SubscriptionPrefetchingTabletQueue extends
SubscriptionPrefetchingQ
}
if (ev.isCommitted()) {
- ev.cleanUp();
+ ev.cleanUp(false);
final String errorMessage =
String.format(
"outdated poll request after commit, consumer id: %s,
commit context: %s, offset: %s, prefetching queue: %s",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index f2961afd7f5..98156972353 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -94,7 +94,7 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
}
if (ev.isCommitted()) {
- ev.cleanUp();
+ ev.cleanUp(false);
final String errorMessage =
String.format(
"outdated poll request after commit, consumer id: %s,
commit context: %s, writing offset: %s, prefetching queue: %s",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index acf47ca8a51..90bc9c603d5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -181,8 +181,8 @@ public class SubscriptionEvent implements
Comparable<SubscriptionEvent> {
* {@link ConcurrentHashMap#compute} method of inFlightEvents in {@link
* SubscriptionPrefetchingQueue} or {@link
SubscriptionPrefetchingQueue#cleanUp}.
*/
- public void cleanUp() {
- pipeEvents.cleanUp();
+ public void cleanUp(final boolean force) {
+ pipeEvents.cleanUp(force);
response.cleanUp();
// TODO: clean more fields
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
index 5f2b0e069a6..fe1b280f16c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
@@ -62,7 +62,7 @@ public abstract class SubscriptionPipeEventBatch {
public abstract void ack();
- public abstract void cleanUp();
+ public abstract void cleanUp(final boolean force);
/////////////////////////////// APIs ///////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
index 36a4e9a1c8a..c20dffcd5e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
@@ -136,7 +136,7 @@ public class SubscriptionPipeEventBatches {
}
public void cleanUp() {
- regionIdToBatch.values().forEach(SubscriptionPipeEventBatch::cleanUp);
+ regionIdToBatch.values().forEach(batch -> batch.cleanUp(true));
regionIdToBatch.clear();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index a5628aca8c6..d2cbf72be0e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -101,9 +101,9 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
}
@Override
- public synchronized void cleanUp() {
+ public synchronized void cleanUp(final boolean force) {
// do nothing if it has next or still referenced by unacked response
- if (hasNext() || referenceCount.get() != 0) {
+ if (!force && (hasNext() || referenceCount.get() != 0)) {
return;
}
@@ -226,7 +226,7 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
currentTsFileInsertionEvent = null;
if (Objects.nonNull(iterationSnapshot)) {
- iterationSnapshot.clear();
+ iterationSnapshot.cleanUp();
}
iterationSnapshot = new SubscriptionPipeTabletIterationSnapshot();
referenceCount.set(0);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
index 137354ae36b..a7466e9070f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
@@ -44,7 +44,7 @@ public class SubscriptionPipeTabletIterationSnapshot {
parsedEnrichedEvents.add(enrichedEvent);
}
- public void clear() {
+ public void ack() {
for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
// close data container in tsfile event
@@ -59,4 +59,20 @@ public class SubscriptionPipeTabletIterationSnapshot {
}
}
}
+
+ public void cleanUp() {
+ for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
+ if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
+ // close data container in tsfile event
+ ((PipeTsFileInsertionEvent) enrichedEvent).close();
+ }
+ }
+
+ for (final EnrichedEvent enrichedEvent : parsedEnrichedEvents) {
+ if (enrichedEvent instanceof PipeRawTabletInsertionEvent) {
+ // clear reference count in raw tablet event
+ enrichedEvent.clearReferenceCount(this.getClass().getName());
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 6148cb1a060..71671d55d95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -58,7 +58,7 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
}
@Override
- public synchronized void cleanUp() {
+ public synchronized void cleanUp(final boolean force) {
// close batch, it includes clearing the reference count of events
batch.close();
enrichedEvents.clear();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
index a2b9b3c3db9..df016b3a39c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
@@ -27,7 +27,7 @@ public class SubscriptionPipeEmptyEvent implements
SubscriptionPipeEvents {
public void ack() {}
@Override
- public void cleanUp() {}
+ public void cleanUp(final boolean force) {}
/////////////////////////////// stringify ///////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
index 05f011699ec..5c7006df7a2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
@@ -23,7 +23,7 @@ public interface SubscriptionPipeEvents {
void ack();
- void cleanUp();
+ void cleanUp(final boolean force);
//////////////////////////// APIs provided for metric framework
////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
index 9bf87c6b1be..f96714921e4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
@@ -46,12 +46,17 @@ public class SubscriptionPipeTabletBatchEvents implements
SubscriptionPipeEvents
@Override
public void ack() {
batch.ack();
- iterationSnapshot.clear();
+ if (Objects.nonNull(iterationSnapshot)) {
+ iterationSnapshot.ack();
+ }
}
@Override
- public void cleanUp() {
- batch.cleanUp();
+ public void cleanUp(final boolean force) {
+ batch.cleanUp(force);
+ if (Objects.nonNull(iterationSnapshot)) {
+ iterationSnapshot.cleanUp();
+ }
}
/////////////////////////////// stringify ///////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
index 8b98f1b758f..8cf31408ec0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
@@ -46,9 +46,9 @@ public class SubscriptionPipeTsFileBatchEvents implements
SubscriptionPipeEvents
}
@Override
- public void cleanUp() {
+ public void cleanUp(final boolean force) {
if (referenceCount.decrementAndGet() == 0) {
- batch.cleanUp();
+ batch.cleanUp(force);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
index 453e44be965..42dc20eb7bc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
@@ -37,7 +37,7 @@ public class SubscriptionPipeTsFilePlainEvent implements
SubscriptionPipeEvents
}
@Override
- public void cleanUp() {
+ public void cleanUp(final boolean force) {
// close data container in tsfile event
tsFileInsertionEvent.close();
// clear the reference count of event