This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 44bb010bd05 Pipe: fix the reference count leak of events in async 
connector during restart (#14324) (#14439)
44bb010bd05 is described below

commit 44bb010bd05bfc376955e7971103b7ed4cb300ef
Author: VGalaxies <[email protected]>
AuthorDate: Mon Dec 16 16:46:24 2024 +0800

    Pipe: fix the reference count leak of events in async connector during 
restart (#14324) (#14439)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../subtask/connector/PipeConnectorSubtask.java    |  9 ++
 .../async/IoTDBDataRegionAsyncConnector.java       | 39 ++++++++-
 .../PipeTransferTabletBatchEventHandler.java       | 33 +++++---
 .../PipeTransferTabletInsertNodeEventHandler.java  | 14 ++--
 .../PipeTransferTabletInsertionEventHandler.java   | 36 ++++----
 .../handler/PipeTransferTabletRawEventHandler.java | 15 ++--
 .../handler/PipeTransferTrackableHandler.java      | 98 ++++++++++++++++++++++
 .../async/handler/PipeTransferTsFileHandler.java   | 39 ++++++---
 .../realtime/assigner/PipeDataRegionAssigner.java  | 18 ++--
 .../resource/ref/PipePhantomReferenceManager.java  |  2 +-
 10 files changed, 239 insertions(+), 64 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index cc2d5b5b346..c0470becb3d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -175,6 +175,9 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
       throw new PipeConnectionException(
           "PipeConnector: "
               + outputPipeConnector.getClass().getName()
+              + "(id: "
+              + taskID
+              + ")"
               + " heartbeat failed, or encountered failure when transferring 
generic event. Failure: "
               + e.getMessage(),
           e);
@@ -196,7 +199,13 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
 
     isClosed.set(true);
     try {
+      final long startTime = System.currentTimeMillis();
       outputPipeConnector.close();
+      LOGGER.info(
+          "Pipe: connector subtask {} was closed {} within {} ms",
+          taskID,
+          outputPipeConnector,
+          System.currentTimeMillis() - startTime);
     } catch (final Exception e) {
       LOGGER.info(
           "Exception occurred when closing pipe connector subtask {}, root 
cause: {}",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 7ced0ca87d3..7641eb30f8e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletBatchEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletInsertNodeEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletRawEventHandler;
+import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTrackableHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -67,6 +68,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -87,14 +89,17 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
       "Exception occurred while sending to receiver %s:%s.";
 
-  private IoTDBDataNodeAsyncClientManager clientManager;
-
   private final IoTDBDataRegionSyncConnector retryConnector = new 
IoTDBDataRegionSyncConnector();
   private final BlockingQueue<Event> retryEventQueue = new 
LinkedBlockingQueue<>();
 
+  private IoTDBDataNodeAsyncClientManager clientManager;
+
   private PipeTransferBatchReqBuilder tabletBatchBuilder;
 
+  // use these variables to prevent reference count leaks under some corner 
cases when closing
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
+  private final Map<PipeTransferTrackableHandler, 
PipeTransferTrackableHandler> pendingHandlers =
+      new ConcurrentHashMap<>();
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
@@ -536,12 +541,17 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     isClosed.set(true);
 
     retryConnector.close();
-    clearRetryEventsReferenceCount();
 
     if (tabletBatchBuilder != null) {
       tabletBatchBuilder.close();
     }
 
+    // ensure all on-the-fly handlers have been cleared
+    if (hasPendingHandlers()) {
+      pendingHandlers.forEach((handler, _handler) -> 
handler.clearEventsReferenceCount());
+      pendingHandlers.clear();
+    }
+
     try {
       if (clientManager != null) {
         clientManager.close();
@@ -550,10 +560,13 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       LOGGER.warn("Failed to close client manager.", e);
     }
 
+    // clear reference count of events in retry queue after closing async 
client
+    clearRetryEventsReferenceCount();
+
     super.close();
   }
 
-  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+  //////////////////////// APIs provided for metric framework 
////////////////////////
 
   public int getRetryEventQueueSize() {
     return retryEventQueue.size();
@@ -579,4 +592,22 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       return count.get();
     }
   }
+
+  //////////////////////// APIs provided for PipeTransferTrackableHandler 
////////////////////////
+
+  public boolean isClosed() {
+    return isClosed.get();
+  }
+
+  public void trackHandler(final PipeTransferTrackableHandler handler) {
+    pendingHandlers.put(handler, handler);
+  }
+
+  public void eliminateHandler(final PipeTransferTrackableHandler handler) {
+    pendingHandlers.remove(handler);
+  }
+
+  public boolean hasPendingHandlers() {
+    return !pendingHandlers.isEmpty();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 6ef424565e8..8370dd667ab 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +42,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<TPipeTransferResp> {
+public class PipeTransferTabletBatchEventHandler extends 
PipeTransferTrackableHandler {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeTransferTabletBatchEventHandler.class);
@@ -54,11 +53,11 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
   private final TPipeTransferReq req;
   private final double reqCompressionRatio;
 
-  private final IoTDBDataRegionAsyncConnector connector;
-
   public PipeTransferTabletBatchEventHandler(
       final PipeTabletEventPlainBatch batch, final 
IoTDBDataRegionAsyncConnector connector)
       throws IOException {
+    super(connector);
+
     // Deep copy to keep events' reference
     events = batch.deepCopyEvents();
     pipeName2BytesAccumulated = batch.deepCopyPipeName2BytesAccumulated();
@@ -70,8 +69,6 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
                 uncompressedReq, connector.getCompressors())
             : uncompressedReq;
     reqCompressionRatio = (double) req.getBody().length / 
uncompressedReq.getBody().length;
-
-    this.connector = connector;
   }
 
   public void transfer(final AsyncPipeDataTransferServiceClient client) throws 
TException {
@@ -83,15 +80,15 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
           (long) (entry.getValue() * reqCompressionRatio));
     }
 
-    client.pipeTransfer(req, this);
+    tryTransfer(client, req);
   }
 
   @Override
-  public void onComplete(final TPipeTransferResp response) {
+  protected boolean onCompleteInternal(final TPipeTransferResp response) {
     // Just in case
     if (response == null) {
       onError(new PipeException("TPipeTransferResp is null"));
-      return;
+      return false;
     }
 
     try {
@@ -114,11 +111,14 @@ public class PipeTransferTabletBatchEventHandler 
implements AsyncMethodCallback<
                   PipeTransferTabletBatchEventHandler.class.getName(), true));
     } catch (final Exception e) {
       onError(e);
+      return false;
     }
+
+    return true;
   }
 
   @Override
-  public void onError(final Exception exception) {
+  protected void onErrorInternal(final Exception exception) {
     try {
       LOGGER.warn(
           "Failed to transfer TabletInsertionEvent batch. Total failed events: 
{}, related pipe names: {}",
@@ -129,4 +129,17 @@ public class PipeTransferTabletBatchEventHandler 
implements AsyncMethodCallback<
       connector.addFailureEventsToRetryQueue(events);
     }
   }
+
+  @Override
+  protected void doTransfer(
+      final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
+      throws TException {
+    client.pipeTransfer(req, this);
+  }
+
+  @Override
+  public void clearEventsReferenceCount() {
+    events.forEach(
+        event -> 
event.clearReferenceCount(PipeTransferTabletBatchEventHandler.class.getName()));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 43dfb6aad29..107dccceec7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -24,28 +24,28 @@ import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.thrift.TException;
 
 public class PipeTransferTabletInsertNodeEventHandler
-    extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
+    extends PipeTransferTabletInsertionEventHandler {
 
   public PipeTransferTabletInsertNodeEventHandler(
-      PipeInsertNodeTabletInsertionEvent event,
-      TPipeTransferReq req,
-      IoTDBDataRegionAsyncConnector connector) {
+      final PipeInsertNodeTabletInsertionEvent event,
+      final TPipeTransferReq req,
+      final IoTDBDataRegionAsyncConnector connector) {
     super(event, req, connector);
   }
 
   @Override
-  protected void doTransfer(AsyncPipeDataTransferServiceClient client, 
TPipeTransferReq req)
+  protected void doTransfer(
+      final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
       throws TException {
     client.pipeTransfer(req, this);
   }
 
   @Override
-  protected void updateLeaderCache(TSStatus status) {
+  protected void updateLeaderCache(final TSStatus status) {
     connector.updateLeaderCache(
         ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(), 
status.getRedirectNode());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 40561121724..f77d41b6662 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -30,12 +30,10 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class PipeTransferTabletInsertionEventHandler<E extends 
TPipeTransferResp>
-    implements AsyncMethodCallback<E> {
+public abstract class PipeTransferTabletInsertionEventHandler extends 
PipeTransferTrackableHandler {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeTransferTabletInsertionEventHandler.class);
@@ -43,15 +41,14 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
   protected final TabletInsertionEvent event;
   protected final TPipeTransferReq req;
 
-  protected final IoTDBDataRegionAsyncConnector connector;
-
   protected PipeTransferTabletInsertionEventHandler(
       final TabletInsertionEvent event,
       final TPipeTransferReq req,
       final IoTDBDataRegionAsyncConnector connector) {
+    super(connector);
+
     this.event = event;
     this.req = req;
-    this.connector = connector;
   }
 
   public void transfer(final AsyncPipeDataTransferServiceClient client) throws 
TException {
@@ -63,19 +60,15 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
           req.getBody().length);
     }
 
-    doTransfer(client, req);
+    tryTransfer(client, req);
   }
 
-  protected abstract void doTransfer(
-      final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
-      throws TException;
-
   @Override
-  public void onComplete(final TPipeTransferResp response) {
+  protected boolean onCompleteInternal(final TPipeTransferResp response) {
     // Just in case
     if (response == null) {
       onError(new PipeException("TPipeTransferResp is null"));
-      return;
+      return false;
     }
 
     final TSStatus status = response.getStatus();
@@ -96,13 +89,14 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
       }
     } catch (final Exception e) {
       onError(e);
+      return false;
     }
-  }
 
-  protected abstract void updateLeaderCache(final TSStatus status);
+    return true;
+  }
 
   @Override
-  public void onError(final Exception exception) {
+  protected void onErrorInternal(final Exception exception) {
     try {
       LOGGER.warn(
           "Failed to transfer TabletInsertionEvent {} (committer key={}, 
commit id={}).",
@@ -116,4 +110,14 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
       connector.addFailureEventToRetryQueue(event);
     }
   }
+
+  @Override
+  public void clearEventsReferenceCount() {
+    if (event instanceof EnrichedEvent) {
+      ((EnrichedEvent) event)
+          
.clearReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName());
+    }
+  }
+
+  protected abstract void updateLeaderCache(final TSStatus status);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
index 481b4830d93..e8c5c2c2f07 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
@@ -24,28 +24,27 @@ import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.thrift.TException;
 
-public class PipeTransferTabletRawEventHandler
-    extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
+public class PipeTransferTabletRawEventHandler extends 
PipeTransferTabletInsertionEventHandler {
 
   public PipeTransferTabletRawEventHandler(
-      PipeRawTabletInsertionEvent event,
-      TPipeTransferReq req,
-      IoTDBDataRegionAsyncConnector connector) {
+      final PipeRawTabletInsertionEvent event,
+      final TPipeTransferReq req,
+      final IoTDBDataRegionAsyncConnector connector) {
     super(event, req, connector);
   }
 
   @Override
-  protected void doTransfer(AsyncPipeDataTransferServiceClient client, 
TPipeTransferReq req)
+  protected void doTransfer(
+      final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
       throws TException {
     client.pipeTransfer(req, this);
   }
 
   @Override
-  protected void updateLeaderCache(TSStatus status) {
+  protected void updateLeaderCache(final TSStatus status) {
     connector.updateLeaderCache(
         ((PipeRawTabletInsertionEvent) event).getDeviceId(), 
status.getRedirectNode());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
new file mode 100644
index 00000000000..033ce5353f5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
+
+import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+
+public abstract class PipeTransferTrackableHandler
+    implements AsyncMethodCallback<TPipeTransferResp> {
+
+  protected final IoTDBDataRegionAsyncConnector connector;
+
+  public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncConnector 
connector) {
+    this.connector = connector;
+  }
+
+  @Override
+  public void onComplete(final TPipeTransferResp response) {
+    if (!connector.isClosed()) {
+      if (onCompleteInternal(response)) {
+        // eliminate handler only when all transmissions corresponding to the 
handler have been
+        // completed
+        connector.eliminateHandler(this);
+      }
+    } else {
+      clearEventsReferenceCount();
+      connector.eliminateHandler(this);
+    }
+  }
+
+  @Override
+  public void onError(final Exception exception) {
+    if (!connector.isClosed()) {
+      onErrorInternal(exception);
+    } else {
+      clearEventsReferenceCount();
+    }
+    connector.eliminateHandler(this);
+  }
+
+  /**
+   * Attempts to transfer data using the provided client and request.
+   *
+   * @param client the client used for data transfer
+   * @param req the request containing transfer details
+   * @return {@code true} if the transfer was initiated successfully, {@code 
false} if the connector
+   *     is closed
+   * @throws TException if an error occurs during the transfer
+   */
+  protected boolean tryTransfer(
+      final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
+      throws TException {
+    // track handler before checking if connector is closed
+    connector.trackHandler(this);
+    if (connector.isClosed()) {
+      clearEventsReferenceCount();
+      return false;
+    }
+    doTransfer(client, req);
+    return true;
+  }
+
+  /**
+   * @return {@code true} if all transmissions corresponding to the handler 
have been completed,
+   *     {@code false} otherwise
+   */
+  protected abstract boolean onCompleteInternal(final TPipeTransferResp 
response);
+
+  protected abstract void onErrorInternal(final Exception exception);
+
+  protected abstract void doTransfer(
+      final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
+      throws TException;
+
+  public abstract void clearEventsReferenceCount();
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index a5f126d7123..2cac135ad0d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -38,7 +38,6 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,13 +54,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-public class PipeTransferTsFileHandler implements 
AsyncMethodCallback<TPipeTransferResp> {
+public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
 
-  // Used to transfer the file
-  private final IoTDBDataRegionAsyncConnector connector;
-
   // Used to rate limit the transfer
   private final Map<Pair<String, Long>, Double> pipeName2WeightMap;
 
@@ -98,7 +94,7 @@ public class PipeTransferTsFileHandler implements 
AsyncMethodCallback<TPipeTrans
       final File modFile,
       final boolean transferMod)
       throws FileNotFoundException {
-    this.connector = connector;
+    super(connector);
 
     this.pipeName2WeightMap = pipeName2WeightMap;
 
@@ -168,7 +164,9 @@ public class PipeTransferTsFileHandler implements 
AsyncMethodCallback<TPipeTrans
                     client.getEndPoint(),
                     (long) (req.getBody().length * weight)));
 
-        client.pipeTransfer(req, this);
+        if (!tryTransfer(client, req)) {
+          return;
+        }
       }
       return;
     }
@@ -197,13 +195,15 @@ public class PipeTransferTsFileHandler implements 
AsyncMethodCallback<TPipeTrans
                 client.getEndPoint(),
                 (long) (req.getBody().length * weight)));
 
-    client.pipeTransfer(req, this);
+    if (!tryTransfer(client, req)) {
+      return;
+    }
 
     position += readLength;
   }
 
   @Override
-  public void onComplete(final TPipeTransferResp response) {
+  protected boolean onCompleteInternal(final TPipeTransferResp response) {
     if (isSealSignalSent.get()) {
       try {
         final TSStatus status = response.getStatus();
@@ -220,7 +220,7 @@ public class PipeTransferTsFileHandler implements 
AsyncMethodCallback<TPipeTrans
         }
       } catch (final Exception e) {
         onError(e);
-        return;
+        return false;
       }
 
       try {
@@ -262,7 +262,7 @@ public class PipeTransferTsFileHandler implements 
AsyncMethodCallback<TPipeTrans
           client.returnSelf();
         }
       }
-      return;
+      return true;
     }
 
     // If the isSealSignalSent is false, then the response must be a 
PipeTransferFilePieceResp
@@ -292,11 +292,14 @@ public class PipeTransferTsFileHandler implements 
AsyncMethodCallback<TPipeTrans
       transfer(clientManager, client);
     } catch (final Exception e) {
       onError(e);
+      return false;
     }
+
+    return false; // due to seal transfer not yet completed
   }
 
   @Override
-  public void onError(final Exception exception) {
+  protected void onErrorInternal(final Exception exception) {
     try {
       if (events.size() <= 1 || LOGGER.isDebugEnabled()) {
         LOGGER.warn(
@@ -347,4 +350,16 @@ public class PipeTransferTsFileHandler implements 
AsyncMethodCallback<TPipeTrans
       }
     }
   }
+
+  @Override
+  protected void doTransfer(
+      final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
+      throws TException {
+    client.pipeTransfer(req, this);
+  }
+
+  @Override
+  public void clearEventsReferenceCount() {
+    events.forEach(event -> 
event.clearReferenceCount(PipeTransferTsFileHandler.class.getName()));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index c5f08634df4..3e35a369b9e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -91,10 +91,14 @@ public class PipeDataRegionAssigner implements Closeable {
       ((PipeHeartbeatEvent) innerEvent).onPublished();
     }
 
-    if (!disruptor.isClosed()) {
-      disruptor.publish(event);
-    } else {
-      onAssignedHook(event);
+    // use synchronized here for completely preventing reference count leaks 
under extreme thread
+    // scheduling when closing
+    synchronized (this) {
+      if (!disruptor.isClosed()) {
+        disruptor.publish(event);
+      } else {
+        onAssignedHook(event);
+      }
     }
   }
 
@@ -216,12 +220,14 @@ public class PipeDataRegionAssigner implements Closeable {
    * should not be used after calling this method.
    */
   @Override
-  public void close() {
+  // use synchronized here for completely preventing reference count leaks 
under extreme thread
+  // scheduling when closing
+  public synchronized void close() {
     PipeAssignerMetrics.getInstance().deregister(dataRegionId);
-    matcher.clear();
 
     final long startTime = System.currentTimeMillis();
     disruptor.shutdown();
+    matcher.clear();
     LOGGER.info(
         "Pipe: Assigner on data region {} shutdown internal disruptor within 
{} ms",
         dataRegionId,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java
index d95616e685d..9f7306ed9af 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java
@@ -155,7 +155,7 @@ public abstract class PipePhantomReferenceManager {
       }
 
       if (referenceCount.get() >= 1) {
-        LOGGER.error("PIPE EVENT RESOURCE LEAK DETECTED: {}", holderMessage);
+        LOGGER.error("PIPE EVENT RESOURCE LEAK DETECTED ({}): {}", 
referenceCount, holderMessage);
         finalizeResource();
       }
 

Reply via email to