This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 2783e801f19 [To dev/1.3] Pipe: release resources of pipe event that 
have been GCed but the reference count is not zero by PhantomReference (#13360, 
#13756, #13813, #13962, part of #13992) (#14355)
2783e801f19 is described below

commit 2783e801f1951fb4643aa220112bb3171d109352
Author: VGalaxies <[email protected]>
AuthorDate: Sat Dec 7 02:21:46 2024 +0800

    [To dev/1.3] Pipe: release resources of pipe event that have been GCed but 
the reference count is not zero by PhantomReference (#13360, #13756, #13813, 
#13962, part of #13992) (#14355)
    
    * Pipe: release resources of pipe event that have been GCed but the 
reference count is not zero by PhantomReference (#13360)
    
    * Pipe: remove protected method reference under jdk8 & add GH yml for 
checking compatibility issues (#13756)
    
    * Pipe: add seperated thread pool for phantom reference clean job (#13813)
    
    * Pipe: reduce pipe phantom reference logging frequency by omitting 
printing if count unchanged (#13962)
    
    * cherry-pick part of #13992
---
 .github/workflows/compile-check.yml                |  52 +++++++
 .../agent/runtime/PipeConfigNodeRuntimeAgent.java  |  29 ++++
 .../pipe/event/PipeConfigRegionSnapshotEvent.java  |  57 ++++++-
 .../manager/pipe/metric/PipeConfigNodeMetrics.java |   2 +
 .../pipe/metric/PipeConfigNodeResourceMetrics.java |  69 +++++++++
 .../resource/PipeConfigNodeResourceManager.java    |   8 +
 .../ref/PipeConfigNodePhantomReferenceManager.java |  38 +++++
 .../agent/runtime/PipeDataNodeRuntimeAgent.java    |  14 ++
 .../db/pipe/event/ReferenceTrackableEvent.java     |  27 ++++
 .../schema/PipeSchemaRegionSnapshotEvent.java      |  60 +++++++-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  43 +++++-
 .../common/tablet/PipeRawTabletInsertionEvent.java |  39 ++++-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  51 ++++++-
 .../iotdb/db/pipe/metric/PipeResourceMetrics.java  |   9 ++
 .../pipe/resource/PipeDataNodeResourceManager.java |   8 +
 .../ref/PipeDataNodePhantomReferenceManager.java   |  38 +++++
 .../event/cache/SubscriptionPollResponseCache.java |   4 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   2 +
 .../apache/iotdb/commons/conf/CommonConfig.java    |  20 +++
 .../iotdb/commons/conf/CommonDescriptor.java       |  11 ++
 .../AbstractPipePeriodicalJobExecutor.java}        |  45 +++---
 .../agent/runtime/PipePeriodicalJobExecutor.java   |  39 +++++
 .../PipePeriodicalPhantomReferenceCleaner.java     |  34 +++++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  14 +-
 .../iotdb/commons/pipe/event/EnrichedEvent.java    |  10 +-
 .../resource/ref/PipePhantomReferenceManager.java  | 168 +++++++++++++++++++++
 .../iotdb/commons/service/metric/enums/Metric.java |   1 +
 27 files changed, 854 insertions(+), 38 deletions(-)

diff --git a/.github/workflows/compile-check.yml 
b/.github/workflows/compile-check.yml
new file mode 100644
index 00000000000..b20e8edd999
--- /dev/null
+++ b/.github/workflows/compile-check.yml
@@ -0,0 +1,52 @@
+#  This workflow will compile IoTDB under jdk8 to check for compatibility 
issues
+
+name: Compile Check
+
+on:
+  push:
+    branches:
+      - master
+      - 'rel/1.*'
+      - 'rc/1.*'
+    paths-ignore:
+      - 'docs/**'
+      - 'site/**'
+  pull_request:
+    branches:
+      - master
+      - 'rel/1.*'
+      - 'rc/1.*'
+    paths-ignore:
+      - 'docs/**'
+      - 'site/**'
+  # allow manually run the action:
+  workflow_dispatch:
+
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref }}
+  cancel-in-progress: true
+
+env:
+  MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false 
-Dmaven.wagon.http.retryHandler.class=standard 
-Dmaven.wagon.http.retryHandler.count=3
+  MAVEN_ARGS: --batch-mode --no-transfer-progress
+  DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+
+jobs:
+  compile-check:
+    strategy:
+      fail-fast: false
+      matrix:
+        java: [8]
+        os: [ ubuntu-latest ]
+    runs-on: ${{ matrix.os }}
+    steps:
+      - uses: actions/checkout@v4
+      - name: Set up JDK ${{ matrix.java }}
+        uses: actions/setup-java@v4
+        with:
+          distribution: liberica
+          java-version: ${{ matrix.java }}
+      - name: Compiler Test
+        shell: bash
+        run: |
+          mvn clean compile -P with-integration-tests -ntp
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
index cc6056522ef..b89e401d5c7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
@@ -22,6 +22,8 @@ package 
org.apache.iotdb.confignode.manager.pipe.agent.runtime;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
+import 
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenceCleaner;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -45,6 +47,12 @@ public class PipeConfigNodeRuntimeAgent implements IService {
 
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
+  private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
+      new PipePeriodicalJobExecutor();
+
+  private final PipePeriodicalPhantomReferenceCleaner 
pipePeriodicalPhantomReferenceCleaner =
+      new PipePeriodicalPhantomReferenceCleaner();
+
   @Override
   public synchronized void start() {
     PipeConfig.getInstance().printAllConfigs();
@@ -58,6 +66,13 @@ public class PipeConfigNodeRuntimeAgent implements IService {
     // Clean receiver file dir
     PipeConfigNodeAgent.receiver().cleanPipeReceiverDir();
 
+    // Start periodical job executor
+    pipePeriodicalJobExecutor.start();
+
+    if (PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) {
+      pipePeriodicalPhantomReferenceCleaner.start();
+    }
+
     isShutdown.set(false);
     LOGGER.info("PipeRuntimeConfigNodeAgent started");
   }
@@ -69,6 +84,9 @@ public class PipeConfigNodeRuntimeAgent implements IService {
     }
     isShutdown.set(true);
 
+    // Stop periodical job executor
+    pipePeriodicalJobExecutor.stop();
+
     PipeConfigNodeAgent.task().dropAllPipeTasks();
 
     LOGGER.info("PipeRuntimeConfigNodeAgent stopped");
@@ -143,4 +161,15 @@ public class PipeConfigNodeRuntimeAgent implements 
IService {
       PipeConfigNodeAgent.task().stopAllPipesWithCriticalException();
     }
   }
+
+  /////////////////////////// Periodical Job Executor 
///////////////////////////
+
+  public void registerPeriodicalJob(String id, Runnable periodicalJob, long 
intervalInSeconds) {
+    pipePeriodicalJobExecutor.register(id, periodicalJob, intervalInSeconds);
+  }
+
+  public void registerPhantomReferenceCleanJob(
+      String id, Runnable periodicalJob, long intervalInSeconds) {
+    pipePeriodicalPhantomReferenceCleaner.register(id, periodicalJob, 
intervalInSeconds);
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
index 8e17cb9b515..06a22353da3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
@@ -23,9 +23,12 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
+import 
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
+import 
org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
 import 
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
 import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType;
+import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
 
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
@@ -41,9 +44,12 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
+public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent
+    implements ReferenceTrackableEvent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConfigRegionSnapshotEvent.class);
   private String snapshotPath;
@@ -243,4 +249,53 @@ public class PipeConfigRegionSnapshotEvent extends 
PipeSnapshotEvent {
         + " - "
         + super.coreReportMessage();
   }
+
+  /////////////////////////// ReferenceTrackableEvent 
///////////////////////////
+
+  @Override
+  protected void trackResource() {
+    PipeConfigNodeResourceManager.ref().trackPipeEventResource(this, 
eventResourceBuilder());
+  }
+
+  @Override
+  public PipeEventResource eventResourceBuilder() {
+    return new PipeConfigRegionSnapshotEventResource(
+        this.isReleased,
+        this.referenceCount,
+        this.resourceManager,
+        this.snapshotPath,
+        this.templateFilePath);
+  }
+
+  private static class PipeConfigRegionSnapshotEventResource extends 
PipeEventResource {
+
+    private final PipeSnapshotResourceManager resourceManager;
+    private final String snapshotPath;
+    private final String templateFilePath;
+
+    private PipeConfigRegionSnapshotEventResource(
+        final AtomicBoolean isReleased,
+        final AtomicInteger referenceCount,
+        final PipeSnapshotResourceManager resourceManager,
+        final String snapshotPath,
+        final String templateFilePath) {
+      super(isReleased, referenceCount);
+      this.resourceManager = resourceManager;
+      this.snapshotPath = snapshotPath;
+      this.templateFilePath = templateFilePath;
+    }
+
+    @Override
+    protected void finalizeResource() {
+      try {
+        resourceManager.decreaseSnapshotReference(snapshotPath);
+        if (!templateFilePath.isEmpty()) {
+          resourceManager.decreaseSnapshotReference(templateFilePath);
+        }
+      } catch (final Exception e) {
+        LOGGER.warn(
+            String.format("Decrease reference count for snapshot %s error.", 
snapshotPath), e);
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
index 18b962eab89..cc9652cb6f4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
@@ -43,6 +43,7 @@ public class PipeConfigNodeMetrics implements IMetricSet {
     PipeConfigNodeRemainingTimeMetrics.getInstance().bindTo(metricService);
     PipeTemporaryMetaInCoordinatorMetrics.getInstance().bindTo(metricService);
     PipeConfigNodeReceiverMetrics.getInstance().bindTo(metricService);
+    PipeConfigNodeResourceMetrics.getInstance().bindTo(metricService);
   }
 
   @Override
@@ -55,5 +56,6 @@ public class PipeConfigNodeMetrics implements IMetricSet {
     PipeConfigNodeRemainingTimeMetrics.getInstance().unbindFrom(metricService);
     
PipeTemporaryMetaInCoordinatorMetrics.getInstance().unbindFrom(metricService);
     PipeConfigNodeReceiverMetrics.getInstance().unbindFrom(metricService);
+    PipeConfigNodeResourceMetrics.getInstance().unbindFrom(metricService);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeResourceMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeResourceMetrics.java
new file mode 100644
index 00000000000..d7b1967fc0f
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeResourceMetrics.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.metric;
+
+import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import 
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeConfigNodeResourceMetrics implements IMetricSet {
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(final AbstractMetricService metricService) {
+    // phantom reference count
+    metricService.createAutoGauge(
+        Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        PipeConfigNodeResourceManager.ref(),
+        PipePhantomReferenceManager::getPhantomReferenceCount);
+  }
+
+  @Override
+  public void unbindFrom(final AbstractMetricService metricService) {
+    // phantom reference count
+    metricService.remove(MetricType.AUTO_GAUGE, 
Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString());
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeConfigNodeResourceMetricsHolder {
+
+    private static final PipeConfigNodeResourceMetrics INSTANCE =
+        new PipeConfigNodeResourceMetrics();
+
+    private PipeConfigNodeResourceMetricsHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PipeConfigNodeResourceMetrics getInstance() {
+    return 
PipeConfigNodeResourceMetrics.PipeConfigNodeResourceMetricsHolder.INSTANCE;
+  }
+
+  private PipeConfigNodeResourceMetrics() {
+    // empty constructor
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java
index c13548c1d09..0dc88dcd250 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java
@@ -20,12 +20,15 @@
 package org.apache.iotdb.confignode.manager.pipe.resource;
 
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogManager;
+import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
 import 
org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager;
+import 
org.apache.iotdb.confignode.manager.pipe.resource.ref.PipeConfigNodePhantomReferenceManager;
 
 public class PipeConfigNodeResourceManager {
 
   private final PipeSnapshotResourceManager pipeSnapshotResourceManager;
   private final PipeLogManager pipeLogManager;
+  private final PipePhantomReferenceManager pipePhantomReferenceManager;
 
   public static PipeSnapshotResourceManager snapshot() {
     return PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE
@@ -36,11 +39,16 @@ public class PipeConfigNodeResourceManager {
     return 
PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE.pipeLogManager;
   }
 
+  public static PipePhantomReferenceManager ref() {
+    return PipeResourceManagerHolder.INSTANCE.pipePhantomReferenceManager;
+  }
+
   ///////////////////////////// SINGLETON /////////////////////////////
 
   private PipeConfigNodeResourceManager() {
     pipeSnapshotResourceManager = new PipeConfigNodeSnapshotResourceManager();
     pipeLogManager = new PipeLogManager();
+    pipePhantomReferenceManager = new PipeConfigNodePhantomReferenceManager();
   }
 
   private static class PipeResourceManagerHolder {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/ref/PipeConfigNodePhantomReferenceManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/ref/PipeConfigNodePhantomReferenceManager.java
new file mode 100644
index 00000000000..b867163e47e
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/ref/PipeConfigNodePhantomReferenceManager.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.resource.ref;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
+import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
+
+public class PipeConfigNodePhantomReferenceManager extends 
PipePhantomReferenceManager {
+
+  public PipeConfigNodePhantomReferenceManager() {
+    super();
+
+    PipeConfigNodeAgent.runtime()
+        .registerPhantomReferenceCleanJob(
+            "PipePhantomReferenceManager#gcHook()",
+            // NOTE: lambda CAN NOT be replaced with method reference
+            () -> super.gcHook(),
+            
PipeConfig.getInstance().getPipeEventReferenceEliminateIntervalSeconds());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index cb9484c49e7..d4f11db4ad7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -25,6 +25,8 @@ import 
org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
+import 
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenceCleaner;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -61,6 +63,9 @@ public class PipeDataNodeRuntimeAgent implements IService {
   private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
       new PipePeriodicalJobExecutor();
 
+  private final PipePeriodicalPhantomReferenceCleaner 
pipePeriodicalPhantomReferenceCleaner =
+      new PipePeriodicalPhantomReferenceCleaner();
+
   //////////////////////////// System Service Interface 
////////////////////////////
 
   public synchronized void preparePipeResources(
@@ -86,6 +91,10 @@ public class PipeDataNodeRuntimeAgent implements IService {
         PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds());
     pipePeriodicalJobExecutor.start();
 
+    if (PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) {
+      pipePeriodicalPhantomReferenceCleaner.start();
+    }
+
     isShutdown.set(false);
   }
 
@@ -230,4 +239,9 @@ public class PipeDataNodeRuntimeAgent implements IService {
   public void clearPeriodicalJobExecutor() {
     pipePeriodicalJobExecutor.clear();
   }
+
+  public void registerPhantomReferenceCleanJob(
+      String id, Runnable periodicalJob, long intervalInSeconds) {
+    pipePeriodicalPhantomReferenceCleaner.register(id, periodicalJob, 
intervalInSeconds);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/ReferenceTrackableEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/ReferenceTrackableEvent.java
new file mode 100644
index 00000000000..b3d64d68e9c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/ReferenceTrackableEvent.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event;
+
+import 
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
+
+public interface ReferenceTrackableEvent {
+
+  PipeEventResource eventResourceBuilder();
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
index 0d3b9540f28..dd3097694cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
@@ -23,6 +23,9 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
+import 
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
+import 
org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager;
+import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
@@ -39,9 +42,12 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-public class PipeSchemaRegionSnapshotEvent extends PipeSnapshotEvent {
+public class PipeSchemaRegionSnapshotEvent extends PipeSnapshotEvent
+    implements ReferenceTrackableEvent {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeSchemaRegionSnapshotEvent.class);
   private String mTreeSnapshotPath;
   private String tagLogSnapshotPath;
@@ -229,4 +235,56 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
         + " - "
         + super.coreReportMessage();
   }
+
+  /////////////////////////// ReferenceTrackableEvent 
///////////////////////////
+
+  @Override
+  protected void trackResource() {
+    PipeDataNodeResourceManager.ref().trackPipeEventResource(this, 
eventResourceBuilder());
+  }
+
+  @Override
+  public PipeEventResource eventResourceBuilder() {
+    return new PipeSchemaRegionSnapshotEventResource(
+        this.isReleased,
+        this.referenceCount,
+        this.resourceManager,
+        this.mTreeSnapshotPath,
+        this.tagLogSnapshotPath);
+  }
+
+  private static class PipeSchemaRegionSnapshotEventResource extends 
PipeEventResource {
+
+    private final PipeSnapshotResourceManager resourceManager;
+    private final String mTreeSnapshotPath;
+    private final String tagLogSnapshotPath;
+
+    private PipeSchemaRegionSnapshotEventResource(
+        final AtomicBoolean isReleased,
+        final AtomicInteger referenceCount,
+        final PipeSnapshotResourceManager resourceManager,
+        final String mTreeSnapshotPath,
+        final String tagLogSnapshotPath) {
+      super(isReleased, referenceCount);
+      this.resourceManager = resourceManager;
+      this.mTreeSnapshotPath = mTreeSnapshotPath;
+      this.tagLogSnapshotPath = tagLogSnapshotPath;
+    }
+
+    @Override
+    protected void finalizeResource() {
+      try {
+        resourceManager.decreaseSnapshotReference(mTreeSnapshotPath);
+        if (!tagLogSnapshotPath.isEmpty()) {
+          resourceManager.decreaseSnapshotReference(tagLogSnapshotPath);
+        }
+      } catch (final Exception e) {
+        LOGGER.warn(
+            String.format(
+                "Decrease reference count for mTree snapshot %s or tLog %s 
error.",
+                mTreeSnapshotPath, tagLogSnapshotPath),
+            e);
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 7a3ad4b1a20..faab03970fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -25,7 +25,9 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
 import 
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -58,7 +60,7 @@ import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
-    implements TabletInsertionEvent, Accountable {
+    implements TabletInsertionEvent, ReferenceTrackableEvent, Accountable {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeInsertNodeTabletInsertionEvent.class);
@@ -439,4 +441,43 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
         + (Objects.nonNull(devicePath) ? PartialPath.estimateSize(devicePath) 
: 0)
         + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0);
   }
+
+  /////////////////////////// ReferenceTrackableEvent 
///////////////////////////
+
+  @Override
+  protected void trackResource() {
+    PipeDataNodeResourceManager.ref().trackPipeEventResource(this, 
eventResourceBuilder());
+  }
+
+  @Override
+  public PipeEventResource eventResourceBuilder() {
+    return new PipeInsertNodeTabletInsertionEventResource(
+        this.isReleased, this.referenceCount, this.walEntryHandler);
+  }
+
+  private static class PipeInsertNodeTabletInsertionEventResource extends 
PipeEventResource {
+
+    private final WALEntryHandler walEntryHandler;
+
+    private PipeInsertNodeTabletInsertionEventResource(
+        final AtomicBoolean isReleased,
+        final AtomicInteger referenceCount,
+        final WALEntryHandler walEntryHandler) {
+      super(isReleased, referenceCount);
+      this.walEntryHandler = walEntryHandler;
+    }
+
+    @Override
+    protected void finalizeResource() {
+      try {
+        PipeDataNodeResourceManager.wal().unpin(walEntryHandler);
+        // no need to release the containers' memory because it has already 
been GCed
+      } catch (final Exception e) {
+        LOGGER.warn(
+            String.format(
+                "Decrease reference count for memTable %d error.", 
walEntryHandler.getMemTableId()),
+            e);
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 825bb543843..922fd3eac74 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -24,7 +24,9 @@ import 
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -38,10 +40,12 @@ import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.write.record.Tablet;
 
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 
-public class PipeRawTabletInsertionEvent extends EnrichedEvent implements 
TabletInsertionEvent {
-
+public class PipeRawTabletInsertionEvent extends EnrichedEvent
+    implements TabletInsertionEvent, ReferenceTrackableEvent {
   // For better calculation
   private static final long INSTANCE_SIZE =
       
RamUsageEstimator.shallowSizeOfInstance(PipeRawTabletInsertionEvent.class);
@@ -318,4 +322,35 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
         + " - "
         + super.coreReportMessage();
   }
+
+  /////////////////////////// ReferenceTrackableEvent 
///////////////////////////
+
+  @Override
+  protected void trackResource() {
+    PipeDataNodeResourceManager.ref().trackPipeEventResource(this, 
eventResourceBuilder());
+  }
+
+  @Override
+  public PipeEventResource eventResourceBuilder() {
+    return new PipeRawTabletInsertionEventResource(
+        this.isReleased, this.referenceCount, this.allocatedMemoryBlock);
+  }
+
+  private static class PipeRawTabletInsertionEventResource extends 
PipeEventResource {
+
+    private final PipeTabletMemoryBlock allocatedMemoryBlock;
+
+    private PipeRawTabletInsertionEventResource(
+        final AtomicBoolean isReleased,
+        final AtomicInteger referenceCount,
+        final PipeTabletMemoryBlock allocatedMemoryBlock) {
+      super(isReleased, referenceCount);
+      this.allocatedMemoryBlock = allocatedMemoryBlock;
+    }
+
+    @Override
+    protected void finalizeResource() {
+      allocatedMemoryBlock.close();
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index ca8b3178560..d29afdbee83 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -25,6 +25,8 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
+import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider;
@@ -52,8 +54,10 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
-public class PipeTsFileInsertionEvent extends EnrichedEvent implements 
TsFileInsertionEvent {
+public class PipeTsFileInsertionEvent extends EnrichedEvent
+    implements TsFileInsertionEvent, ReferenceTrackableEvent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);
 
@@ -546,4 +550,49 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
         + " - "
         + super.coreReportMessage();
   }
+
+  /////////////////////////// ReferenceTrackableEvent 
///////////////////////////
+
+  @Override
+  public void trackResource() {
+    PipeDataNodeResourceManager.ref().trackPipeEventResource(this, 
eventResourceBuilder());
+  }
+
+  @Override
+  public PipeEventResource eventResourceBuilder() {
+    return new PipeTsFileInsertionEventResource(
+        this.isReleased, this.referenceCount, this.tsFile, this.isWithMod, 
this.modFile);
+  }
+
+  private static class PipeTsFileInsertionEventResource extends 
PipeEventResource {
+
+    private final File tsFile;
+    private final boolean isWithMod;
+    private final File modFile;
+
+    private PipeTsFileInsertionEventResource(
+        final AtomicBoolean isReleased,
+        final AtomicInteger referenceCount,
+        final File tsFile,
+        final boolean isWithMod,
+        final File modFile) {
+      super(isReleased, referenceCount);
+      this.tsFile = tsFile;
+      this.isWithMod = isWithMod;
+      this.modFile = modFile;
+    }
+
+    @Override
+    protected void finalizeResource() {
+      try {
+        PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile);
+        if (isWithMod) {
+          PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile);
+        }
+      } catch (final Exception e) {
+        LOGGER.warn(
+            String.format("Decrease reference count for TsFile %s error.", 
tsFile.getPath()), e);
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
index 04fb647a825..854ff9ffb10 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.metric;
 
+import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -66,6 +67,12 @@ public class PipeResourceMetrics implements IMetricSet {
         MetricLevel.IMPORTANT,
         PipeDataNodeResourceManager.tsfile(),
         PipeTsFileResourceManager::getLinkedTsfileCount);
+    // phantom reference count
+    metricService.createAutoGauge(
+        Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        PipeDataNodeResourceManager.ref(),
+        PipePhantomReferenceManager::getPhantomReferenceCount);
   }
 
   @Override
@@ -78,6 +85,8 @@ public class PipeResourceMetrics implements IMetricSet {
     // resource reference count
     metricService.remove(MetricType.AUTO_GAUGE, 
Metric.PIPE_PINNED_MEMTABLE_COUNT.toString());
     metricService.remove(MetricType.AUTO_GAUGE, 
Metric.PIPE_LINKED_TSFILE_COUNT.toString());
+    // phantom reference count
+    metricService.remove(MetricType.AUTO_GAUGE, 
Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString());
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java
index 74afd58681f..573106e45c2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.db.pipe.resource;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogManager;
+import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
 import 
org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
+import 
org.apache.iotdb.db.pipe.resource.ref.PipeDataNodePhantomReferenceManager;
 import 
org.apache.iotdb.db.pipe.resource.snapshot.PipeDataNodeSnapshotResourceManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
@@ -38,6 +40,7 @@ public class PipeDataNodeResourceManager {
   private final PipeSnapshotResourceManager pipeSnapshotResourceManager;
   private final PipeMemoryManager pipeMemoryManager;
   private final PipeLogManager pipeLogManager;
+  private final PipePhantomReferenceManager pipePhantomReferenceManager;
 
   public static PipeTsFileResourceManager tsfile() {
     return PipeResourceManagerHolder.INSTANCE.pipeTsFileResourceManager;
@@ -69,6 +72,10 @@ public class PipeDataNodeResourceManager {
     return PipeResourceManagerHolder.INSTANCE.pipeLogManager;
   }
 
+  public static PipePhantomReferenceManager ref() {
+    return PipeResourceManagerHolder.INSTANCE.pipePhantomReferenceManager;
+  }
+
   ///////////////////////////// SINGLETON /////////////////////////////
 
   private PipeDataNodeResourceManager() {
@@ -77,6 +84,7 @@ public class PipeDataNodeResourceManager {
     pipeSnapshotResourceManager = new PipeDataNodeSnapshotResourceManager();
     pipeMemoryManager = new PipeMemoryManager();
     pipeLogManager = new PipeLogManager();
+    pipePhantomReferenceManager = new PipeDataNodePhantomReferenceManager();
   }
 
   private static class PipeResourceManagerHolder {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/ref/PipeDataNodePhantomReferenceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/ref/PipeDataNodePhantomReferenceManager.java
new file mode 100644
index 00000000000..991cfebe8fa
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/ref/PipeDataNodePhantomReferenceManager.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.ref;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+
+public class PipeDataNodePhantomReferenceManager extends 
PipePhantomReferenceManager {
+
+  public PipeDataNodePhantomReferenceManager() {
+    super();
+
+    PipeDataNodeAgent.runtime()
+        .registerPhantomReferenceCleanJob(
+            "PipePhantomReferenceManager#gcHook()",
+            // NOTE: lambda CAN NOT be replaced with method reference
+            () -> super.gcHook(),
+            
PipeConfig.getInstance().getPipeEventReferenceEliminateIntervalSeconds());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
index 02a634c9c19..8b003621687 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.subscription.event.cache;
 
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
@@ -98,7 +98,7 @@ public class SubscriptionPollResponseCache {
     final long maxMemorySizeInBytes =
         (long)
             (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
-                * 
PipeConfig.getInstance().getSubscriptionCacheMemoryUsagePercentage());
+                * 
SubscriptionConfig.getInstance().getSubscriptionCacheMemoryUsagePercentage());
 
     // properties required by pipe memory control framework
     final PipeMemoryBlock allocatedMemoryBlock =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 07057bd012d..4e65b254d39 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -136,6 +136,8 @@ public enum ThreadName {
   PIPE_RUNTIME_HEARTBEAT("Pipe-Runtime-Heartbeat"),
   PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"),
   PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR("Pipe-Runtime-Periodical-Job-Executor"),
+  PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER(
+      "Pipe-Runtime-Periodical-Phantom-Reference-Cleaner"),
   PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
   PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
   SUBSCRIPTION_EXECUTOR_POOL("Subscription-Executor-Pool"),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index ee2deab85ab..2d1585d6525 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -281,6 +281,9 @@ public class CommonConfig {
 
   private float subscriptionCacheMemoryUsagePercentage = 0.2F;
 
+  private boolean pipeEventReferenceTrackingEnabled = true;
+  private long pipeEventReferenceEliminateIntervalSeconds = 10;
+
   private int subscriptionSubtaskExecutorMaxThreadNum =
       Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
   private int subscriptionPrefetchTabletBatchMaxDelayInMs = 1000; // 1s
@@ -1230,6 +1233,23 @@ public class CommonConfig {
     this.twoStageAggregateSenderEndPointsCacheInMs = 
twoStageAggregateSenderEndPointsCacheInMs;
   }
 
+  public boolean getPipeEventReferenceTrackingEnabled() {
+    return pipeEventReferenceTrackingEnabled;
+  }
+
+  public void setPipeEventReferenceTrackingEnabled(boolean 
pipeEventReferenceTrackingEnabled) {
+    this.pipeEventReferenceTrackingEnabled = pipeEventReferenceTrackingEnabled;
+  }
+
+  public long getPipeEventReferenceEliminateIntervalSeconds() {
+    return pipeEventReferenceEliminateIntervalSeconds;
+  }
+
+  public void setPipeEventReferenceEliminateIntervalSeconds(
+      long pipeEventReferenceEliminateIntervalSeconds) {
+    this.pipeEventReferenceEliminateIntervalSeconds = 
pipeEventReferenceEliminateIntervalSeconds;
+  }
+
   public float getSubscriptionCacheMemoryUsagePercentage() {
     return subscriptionCacheMemoryUsagePercentage;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index fdd1b46efdd..e6ee4d5c464 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -621,6 +621,17 @@ public class CommonDescriptor {
             properties.getProperty(
                 "two_stage_aggregate_sender_end_points_cache_in_ms",
                 
String.valueOf(config.getTwoStageAggregateSenderEndPointsCacheInMs()))));
+
+    config.setPipeEventReferenceTrackingEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_event_reference_tracking_enabled",
+                
String.valueOf(config.getPipeEventReferenceTrackingEnabled()))));
+    config.setPipeEventReferenceEliminateIntervalSeconds(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_event_reference_eliminate_interval_seconds",
+                
String.valueOf(config.getPipeEventReferenceEliminateIntervalSeconds()))));
   }
 
   private void loadSubscriptionProps(Properties properties) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/AbstractPipePeriodicalJobExecutor.java
similarity index 69%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/AbstractPipePeriodicalJobExecutor.java
index fb3c87d7ef3..c6e24b5f475 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/AbstractPipePeriodicalJobExecutor.java
@@ -17,15 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent.runtime;
+package org.apache.iotdb.commons.pipe.agent.runtime;
 
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.WrappedRunnable;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.service.DataNode;
 
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
@@ -38,19 +34,16 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Single thread to execute pipe periodical jobs on {@link DataNode}. This is 
for limiting the
- * thread num on the {@link DataNode} instance.
+ * Single thread to execute pipe periodical jobs on DataNode or ConfigNode. 
This is for limiting the
+ * thread num on the DataNode or ConfigNode instance.
  */
-public class PipePeriodicalJobExecutor {
+public abstract class AbstractPipePeriodicalJobExecutor {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipePeriodicalJobExecutor.class);
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(AbstractPipePeriodicalJobExecutor.class);
 
-  private static final ScheduledExecutorService PERIODICAL_JOB_EXECUTOR =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-          ThreadName.PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR.getName());
-
-  private static final long MIN_INTERVAL_SECONDS =
-      
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
+  private final ScheduledExecutorService executorService;
+  private final long minIntervalSeconds;
 
   private long rounds;
   private Future<?> executorFuture;
@@ -58,6 +51,12 @@ public class PipePeriodicalJobExecutor {
   // <Periodical job, Interval in rounds>
   private final List<Pair<WrappedRunnable, Long>> periodicalJobs = new 
CopyOnWriteArrayList<>();
 
+  public AbstractPipePeriodicalJobExecutor(
+      final ScheduledExecutorService executorService, final long 
minIntervalSeconds) {
+    this.executorService = executorService;
+    this.minIntervalSeconds = minIntervalSeconds;
+  }
+
   public void register(String id, Runnable periodicalJob, long 
intervalInSeconds) {
     periodicalJobs.add(
         new Pair<>(
@@ -71,11 +70,11 @@ public class PipePeriodicalJobExecutor {
                 }
               }
             },
-            Math.max(intervalInSeconds / MIN_INTERVAL_SECONDS, 1)));
+            Math.max(intervalInSeconds / minIntervalSeconds, 1)));
     LOGGER.info(
         "Pipe periodical job {} is registered successfully. Interval: {} 
seconds.",
         id,
-        Math.max(intervalInSeconds / MIN_INTERVAL_SECONDS, 1) * 
MIN_INTERVAL_SECONDS);
+        Math.max(intervalInSeconds / minIntervalSeconds, 1) * 
minIntervalSeconds);
   }
 
   public synchronized void start() {
@@ -84,16 +83,16 @@ public class PipePeriodicalJobExecutor {
 
       executorFuture =
           ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-              PERIODICAL_JOB_EXECUTOR,
+              executorService,
               this::execute,
-              MIN_INTERVAL_SECONDS,
-              MIN_INTERVAL_SECONDS,
+              minIntervalSeconds,
+              minIntervalSeconds,
               TimeUnit.SECONDS);
       LOGGER.info("Pipe periodical job executor is started successfully.");
     }
   }
 
-  private void execute() {
+  protected void execute() {
     ++rounds;
 
     for (final Pair<WrappedRunnable, Long> periodicalJob : periodicalJobs) {
@@ -116,8 +115,4 @@ public class PipePeriodicalJobExecutor {
     periodicalJobs.clear();
     LOGGER.info("All pipe periodical jobs are cleared successfully.");
   }
-
-  public static long getMinIntervalSeconds() {
-    return MIN_INTERVAL_SECONDS;
-  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
new file mode 100644
index 00000000000..3226b3947f0
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.runtime;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+
+/**
+ * The shortest scheduling cycle for these jobs is {@link
+ * PipeConfig#getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()}, 
suitable for jobs that are
+ * NOT time-critical.
+ */
+public class PipePeriodicalJobExecutor extends 
AbstractPipePeriodicalJobExecutor {
+
+  public PipePeriodicalJobExecutor() {
+    super(
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR.getName()),
+        
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalPhantomReferenceCleaner.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalPhantomReferenceCleaner.java
new file mode 100644
index 00000000000..32f1549917c
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalPhantomReferenceCleaner.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.runtime;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+
+/** The shortest scheduling cycle for these jobs is 1, suitable for jobs that 
are time-critical. */
+public class PipePeriodicalPhantomReferenceCleaner extends 
AbstractPipePeriodicalJobExecutor {
+
+  public PipePeriodicalPhantomReferenceCleaner() {
+    super(
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            
ThreadName.PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER.getName()),
+        1L);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 9cb99a226ba..59ac6db855c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -320,10 +320,14 @@ public class PipeConfig {
     return COMMON_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs();
   }
 
-  /////////////////////////////// Subscription ///////////////////////////////
+  /////////////////////////////// Ref ///////////////////////////////
 
-  public float getSubscriptionCacheMemoryUsagePercentage() {
-    return COMMON_CONFIG.getSubscriptionCacheMemoryUsagePercentage();
+  public boolean getPipeEventReferenceTrackingEnabled() {
+    return COMMON_CONFIG.getPipeEventReferenceTrackingEnabled();
+  }
+
+  public long getPipeEventReferenceEliminateIntervalSeconds() {
+    return COMMON_CONFIG.getPipeEventReferenceEliminateIntervalSeconds();
   }
 
   /////////////////////////////// Utils ///////////////////////////////
@@ -469,8 +473,10 @@ public class PipeConfig {
         "TwoStageAggregateSenderEndPointsCacheInMs: {}",
         getTwoStageAggregateSenderEndPointsCacheInMs());
 
+    LOGGER.info("PipeEventReferenceTrackingEnabled: {}", 
getPipeEventReferenceTrackingEnabled());
     LOGGER.info(
-        "SubscriptionCacheMemoryUsagePercentage: {}", 
getSubscriptionCacheMemoryUsagePercentage());
+        "PipeEventReferenceEliminateIntervalSeconds: {}",
+        getPipeEventReferenceEliminateIntervalSeconds());
   }
 
   /////////////////////////////// Singleton ///////////////////////////////
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index c04653adaa3..a9dae771967 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -98,6 +99,10 @@ public abstract class EnrichedEvent implements Event {
         });
   }
 
+  protected void trackResource() {
+    // do nothing by default
+  }
+
   /**
    * Increase the {@link EnrichedEvent#referenceCount} of this event. When the 
{@link
    * EnrichedEvent#referenceCount} is positive, the data in the resource of 
this {@link
@@ -125,7 +130,10 @@ public abstract class EnrichedEvent implements Event {
     }
 
     if (isSuccessful) {
-      referenceCount.incrementAndGet();
+      if (referenceCount.incrementAndGet() == 1
+          && PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) {
+        trackResource();
+      }
     } else {
       LOGGER.warn(
           "increase reference count failed, EnrichedEvent: {}, stack trace: 
{}",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java
new file mode 100644
index 00000000000..d95616e685d
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.resource.ref;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.ref.PhantomReference;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class PipePhantomReferenceManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipePhantomReferenceManager.class);
+
+  private static final Set<PipeEventPhantomReference> 
PIPE_EVENT_PHANTOM_REFERENCES =
+      ConcurrentHashMap.newKeySet();
+
+  private static final ReferenceQueue<EnrichedEvent> REFERENCE_QUEUE = new 
ReferenceQueue<>();
+
+  private volatile long lastPhantomReferenceCount = -1;
+
+  public PipePhantomReferenceManager() {
+    // Do nothing now.
+  }
+
+  public int getPhantomReferenceCount() {
+    return PIPE_EVENT_PHANTOM_REFERENCES.size();
+  }
+
+  protected void gcHook() {
+    if (!PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) {
+      return;
+    }
+
+    final long startTime = System.currentTimeMillis();
+
+    // limit control to avoid infinite execution
+    final int maxCount = getPhantomReferenceCount();
+    int count = 0;
+
+    Reference<? extends EnrichedEvent> reference;
+    try {
+      while (count < maxCount && ((reference = REFERENCE_QUEUE.remove(500)) != 
null)) {
+        finalizeResource((PipeEventPhantomReference) reference);
+        count++;
+      }
+    } catch (final InterruptedException e) {
+      // Finalize remaining references.
+      while (count < maxCount && ((reference = REFERENCE_QUEUE.poll()) != 
null)) {
+        finalizeResource((PipeEventPhantomReference) reference);
+        count++;
+      }
+    } catch (final Exception e) {
+      // Nowhere to really log this.
+    }
+
+    if (count != 0) {
+      LOGGER.info(
+          "Clean {} pipe phantom reference(s) successfully within {} ms, 
remaining reference count: {}",
+          count,
+          System.currentTimeMillis() - startTime,
+          getPhantomReferenceCount());
+    } else {
+      final long currentPhantomReferenceCount = getPhantomReferenceCount();
+      if (currentPhantomReferenceCount != lastPhantomReferenceCount) {
+        if (lastPhantomReferenceCount != -1) {
+          LOGGER.info("Remaining pipe phantom reference count: {}", 
currentPhantomReferenceCount);
+        }
+        lastPhantomReferenceCount = currentPhantomReferenceCount;
+      }
+    }
+  }
+
+  private void finalizeResource(final PipeEventPhantomReference reference) {
+    try {
+      reference.finalizeResources();
+      reference.clear();
+    } finally {
+      PIPE_EVENT_PHANTOM_REFERENCES.remove(reference);
+    }
+  }
+
+  private static class PipeEventPhantomReference extends 
PhantomReference<EnrichedEvent> {
+
+    private final String holderMessage;
+    private PipeEventResource resource;
+
+    private PipeEventPhantomReference(
+        final EnrichedEvent event,
+        final PipeEventResource resource,
+        final ReferenceQueue<? super EnrichedEvent> queue) {
+      super(event, queue);
+      this.holderMessage = event.getClass().getSimpleName();
+      this.resource = resource;
+    }
+
+    private void finalizeResources() {
+      if (this.resource != null) {
+        try {
+          this.resource.clearReferenceCount(holderMessage);
+        } finally {
+          this.resource = null;
+        }
+      }
+    }
+  }
+
+  ///////////////////// APIs provided for EnrichedEvent /////////////////////
+
+  public void trackPipeEventResource(final EnrichedEvent event, final 
PipeEventResource resource) {
+    final PipeEventPhantomReference reference =
+        new PipeEventPhantomReference(event, resource, REFERENCE_QUEUE);
+    PIPE_EVENT_PHANTOM_REFERENCES.add(reference);
+  }
+
+  public abstract static class PipeEventResource {
+
+    private final AtomicBoolean isReleased;
+    private final AtomicInteger referenceCount;
+
+    protected PipeEventResource(
+        final AtomicBoolean isReleased, final AtomicInteger referenceCount) {
+      this.isReleased = isReleased;
+      this.referenceCount = referenceCount;
+    }
+
+    private void clearReferenceCount(final String holderMessage) {
+      if (isReleased.get()) {
+        return;
+      }
+
+      if (referenceCount.get() >= 1) {
+        LOGGER.error("PIPE EVENT RESOURCE LEAK DETECTED: {}", holderMessage);
+        finalizeResource();
+      }
+
+      referenceCount.set(0);
+      isReleased.set(true);
+    }
+
+    protected abstract void finalizeResource();
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 49344880a93..04757610064 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -156,6 +156,7 @@ public enum Metric {
   PIPE_MEM("pipe_mem"),
   PIPE_PINNED_MEMTABLE_COUNT("pipe_pinned_memtable_count"),
   PIPE_LINKED_TSFILE_COUNT("pipe_linked_tsfile_count"),
+  PIPE_PHANTOM_REFERENCE_COUNT("pipe_phantom_reference_count"),
   
PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE("pipe_async_connector_retry_event_queue_size"),
   PIPE_EVENT_COMMIT_QUEUE_SIZE("pipe_event_commit_queue_size"),
   PIPE_PROCEDURE("pipe_procedure"),

Reply via email to