This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-pipe-ref-count-leak
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5292480a0e06caea274d5d70ce5355080ad550df
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Aug 21 20:22:06 2024 +0800

    Pipe: Fix reference count leak when tasks restart
---
 .../task/subtask/connector/PipeConnectorSubtask.java   | 11 +----------
 .../connector/PipeRealtimePriorityBlockingQueue.java   | 17 +++++++++++++----
 .../pipe/task/connection/BlockingPendingQueue.java     | 18 ++++++++++++++----
 3 files changed, 28 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index a832d43f73e..6264efd5ef7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -223,16 +223,7 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
    */
   public void discardEventsOfPipe(final String pipeNameToDrop) {
     // Try to remove the events as much as possible
-    inputPendingQueue.removeIf(
-        event -> {
-          if (event instanceof EnrichedEvent
-              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) 
{
-            ((EnrichedEvent) event)
-                
.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
-            return true;
-          }
-          return false;
-        });
+    inputPendingQueue.discardEventsOfPipe(pipeNameToDrop);
 
     // synchronized to use the lastEvent and lastExceptionEvent
     synchronized (this) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index a169cb6a338..e710e6db467 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -33,7 +33,6 @@ import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
 
 public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQueue<Event> {
 
@@ -153,9 +152,19 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   }
 
   @Override
-  public void removeIf(final Predicate<? super Event> filter) {
-    super.removeIf(filter);
-    pendingQueue.removeIf(filter);
+  public void discardEventsOfPipe(final String pipeNameToDrop) {
+    super.discardEventsOfPipe(pipeNameToDrop);
+    tsfileInsertEventDeque.removeIf(
+        event -> {
+          if (event instanceof EnrichedEvent
+              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) 
{
+            ((EnrichedEvent) event)
+                
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName());
+            eventCounter.decreaseEventCount(event);
+            return true;
+          }
+          return false;
+        });
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
index 04983a984d9..af43dcf9e38 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.task.connection;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -29,7 +30,6 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
 
 public abstract class BlockingPendingQueue<E extends Event> {
 
@@ -40,7 +40,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
 
   protected final BlockingQueue<E> pendingQueue;
 
-  private final PipeEventCounter eventCounter;
+  protected final PipeEventCounter eventCounter;
 
   protected BlockingPendingQueue(
       final BlockingQueue<E> pendingQueue, final PipeEventCounter 
eventCounter) {
@@ -106,12 +106,22 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
     eventCounter.reset();
   }
 
+  /** DO NOT FORGET to set eventCounter to new value after invoking this 
method. */
   public void forEach(final Consumer<? super E> action) {
     pendingQueue.forEach(action);
   }
 
-  public void removeIf(final Predicate<? super E> filter) {
-    pendingQueue.removeIf(filter);
+  public void discardEventsOfPipe(final String pipeNameToDrop) {
+    pendingQueue.removeIf(
+        event -> {
+          if (event instanceof EnrichedEvent
+              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) 
{
+            ((EnrichedEvent) 
event).clearReferenceCount(BlockingPendingQueue.class.getName());
+            eventCounter.decreaseEventCount(event);
+            return true;
+          }
+          return false;
+        });
   }
 
   public boolean isEmpty() {

Reply via email to