This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-ref-count-leak in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5292480a0e06caea274d5d70ce5355080ad550df Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Aug 21 20:22:06 2024 +0800 Pipe: Fix reference count leak when tasks restart --- .../task/subtask/connector/PipeConnectorSubtask.java | 11 +---------- .../connector/PipeRealtimePriorityBlockingQueue.java | 17 +++++++++++++---- .../pipe/task/connection/BlockingPendingQueue.java | 18 ++++++++++++++---- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java index a832d43f73e..6264efd5ef7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java @@ -223,16 +223,7 @@ public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask { */ public void discardEventsOfPipe(final String pipeNameToDrop) { // Try to remove the events as much as possible - inputPendingQueue.removeIf( - event -> { - if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) { - ((EnrichedEvent) event) - .clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName()); - return true; - } - return false; - }); + inputPendingQueue.discardEventsOfPipe(pipeNameToDrop); // synchronized to use the lastEvent and lastExceptionEvent synchronized (this) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java index a169cb6a338..e710e6db467 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java @@ -33,7 +33,6 @@ import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import java.util.function.Predicate; public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQueue<Event> { @@ -153,9 +152,19 @@ public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQ } @Override - public void removeIf(final Predicate<? super Event> filter) { - super.removeIf(filter); - pendingQueue.removeIf(filter); + public void discardEventsOfPipe(final String pipeNameToDrop) { + super.discardEventsOfPipe(pipeNameToDrop); + tsfileInsertEventDeque.removeIf( + event -> { + if (event instanceof EnrichedEvent + && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) { + ((EnrichedEvent) event) + .clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName()); + eventCounter.decreaseEventCount(event); + return true; + } + return false; + }); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java index 04983a984d9..af43dcf9e38 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.pipe.task.connection; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.metric.PipeEventCounter; import org.apache.iotdb.pipe.api.event.Event; @@ -29,7 +30,6 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.function.Predicate; public abstract class BlockingPendingQueue<E extends Event> { @@ -40,7 +40,7 @@ public abstract class BlockingPendingQueue<E extends Event> { protected final BlockingQueue<E> pendingQueue; - private final PipeEventCounter eventCounter; + protected final PipeEventCounter eventCounter; protected BlockingPendingQueue( final BlockingQueue<E> pendingQueue, final PipeEventCounter eventCounter) { @@ -106,12 +106,22 @@ public abstract class BlockingPendingQueue<E extends Event> { eventCounter.reset(); } + /** DO NOT FORGET to set eventCounter to new value after invoking this method. */ public void forEach(final Consumer<? super E> action) { pendingQueue.forEach(action); } - public void removeIf(final Predicate<? super E> filter) { - pendingQueue.removeIf(filter); + public void discardEventsOfPipe(final String pipeNameToDrop) { + pendingQueue.removeIf( + event -> { + if (event instanceof EnrichedEvent + && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) { + ((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName()); + eventCounter.decreaseEventCount(event); + return true; + } + return false; + }); } public boolean isEmpty() {
