This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 2783e801f19 [To dev/1.3] Pipe: release resources of pipe event that
have been GCed but the reference count is not zero by PhantomReference (#13360,
#13756, #13813, #13962, part of #13992) (#14355)
2783e801f19 is described below
commit 2783e801f1951fb4643aa220112bb3171d109352
Author: VGalaxies <[email protected]>
AuthorDate: Sat Dec 7 02:21:46 2024 +0800
[To dev/1.3] Pipe: release resources of pipe event that have been GCed but
the reference count is not zero by PhantomReference (#13360, #13756, #13813,
#13962, part of #13992) (#14355)
* Pipe: release resources of pipe event that have been GCed but the
reference count is not zero by PhantomReference (#13360)
* Pipe: remove protected method reference under jdk8 & add GH yml for
checking compatibility issues (#13756)
* Pipe: add seperated thread pool for phantom reference clean job (#13813)
* Pipe: reduce pipe phantom reference logging frequency by omitting
printing if count unchanged (#13962)
* cherry-pick part of #13992
---
.github/workflows/compile-check.yml | 52 +++++++
.../agent/runtime/PipeConfigNodeRuntimeAgent.java | 29 ++++
.../pipe/event/PipeConfigRegionSnapshotEvent.java | 57 ++++++-
.../manager/pipe/metric/PipeConfigNodeMetrics.java | 2 +
.../pipe/metric/PipeConfigNodeResourceMetrics.java | 69 +++++++++
.../resource/PipeConfigNodeResourceManager.java | 8 +
.../ref/PipeConfigNodePhantomReferenceManager.java | 38 +++++
.../agent/runtime/PipeDataNodeRuntimeAgent.java | 14 ++
.../db/pipe/event/ReferenceTrackableEvent.java | 27 ++++
.../schema/PipeSchemaRegionSnapshotEvent.java | 60 +++++++-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 43 +++++-
.../common/tablet/PipeRawTabletInsertionEvent.java | 39 ++++-
.../common/tsfile/PipeTsFileInsertionEvent.java | 51 ++++++-
.../iotdb/db/pipe/metric/PipeResourceMetrics.java | 9 ++
.../pipe/resource/PipeDataNodeResourceManager.java | 8 +
.../ref/PipeDataNodePhantomReferenceManager.java | 38 +++++
.../event/cache/SubscriptionPollResponseCache.java | 4 +-
.../iotdb/commons/concurrent/ThreadName.java | 2 +
.../apache/iotdb/commons/conf/CommonConfig.java | 20 +++
.../iotdb/commons/conf/CommonDescriptor.java | 11 ++
.../AbstractPipePeriodicalJobExecutor.java} | 45 +++---
.../agent/runtime/PipePeriodicalJobExecutor.java | 39 +++++
.../PipePeriodicalPhantomReferenceCleaner.java | 34 +++++
.../iotdb/commons/pipe/config/PipeConfig.java | 14 +-
.../iotdb/commons/pipe/event/EnrichedEvent.java | 10 +-
.../resource/ref/PipePhantomReferenceManager.java | 168 +++++++++++++++++++++
.../iotdb/commons/service/metric/enums/Metric.java | 1 +
27 files changed, 854 insertions(+), 38 deletions(-)
diff --git a/.github/workflows/compile-check.yml
b/.github/workflows/compile-check.yml
new file mode 100644
index 00000000000..b20e8edd999
--- /dev/null
+++ b/.github/workflows/compile-check.yml
@@ -0,0 +1,52 @@
+# This workflow will compile IoTDB under jdk8 to check for compatibility
issues
+
+name: Compile Check
+
+on:
+ push:
+ branches:
+ - master
+ - 'rel/1.*'
+ - 'rc/1.*'
+ paths-ignore:
+ - 'docs/**'
+ - 'site/**'
+ pull_request:
+ branches:
+ - master
+ - 'rel/1.*'
+ - 'rc/1.*'
+ paths-ignore:
+ - 'docs/**'
+ - 'site/**'
+ # allow manually run the action:
+ workflow_dispatch:
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}
+ cancel-in-progress: true
+
+env:
+ MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
-Dmaven.wagon.http.retryHandler.class=standard
-Dmaven.wagon.http.retryHandler.count=3
+ MAVEN_ARGS: --batch-mode --no-transfer-progress
+ DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+
+jobs:
+ compile-check:
+ strategy:
+ fail-fast: false
+ matrix:
+ java: [8]
+ os: [ ubuntu-latest ]
+ runs-on: ${{ matrix.os }}
+ steps:
+ - uses: actions/checkout@v4
+ - name: Set up JDK ${{ matrix.java }}
+ uses: actions/setup-java@v4
+ with:
+ distribution: liberica
+ java-version: ${{ matrix.java }}
+ - name: Compiler Test
+ shell: bash
+ run: |
+ mvn clean compile -P with-integration-tests -ntp
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
index cc6056522ef..b89e401d5c7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
@@ -22,6 +22,8 @@ package
org.apache.iotdb.confignode.manager.pipe.agent.runtime;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
+import
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenceCleaner;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -45,6 +47,12 @@ public class PipeConfigNodeRuntimeAgent implements IService {
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
+ new PipePeriodicalJobExecutor();
+
+ private final PipePeriodicalPhantomReferenceCleaner
pipePeriodicalPhantomReferenceCleaner =
+ new PipePeriodicalPhantomReferenceCleaner();
+
@Override
public synchronized void start() {
PipeConfig.getInstance().printAllConfigs();
@@ -58,6 +66,13 @@ public class PipeConfigNodeRuntimeAgent implements IService {
// Clean receiver file dir
PipeConfigNodeAgent.receiver().cleanPipeReceiverDir();
+ // Start periodical job executor
+ pipePeriodicalJobExecutor.start();
+
+ if (PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) {
+ pipePeriodicalPhantomReferenceCleaner.start();
+ }
+
isShutdown.set(false);
LOGGER.info("PipeRuntimeConfigNodeAgent started");
}
@@ -69,6 +84,9 @@ public class PipeConfigNodeRuntimeAgent implements IService {
}
isShutdown.set(true);
+ // Stop periodical job executor
+ pipePeriodicalJobExecutor.stop();
+
PipeConfigNodeAgent.task().dropAllPipeTasks();
LOGGER.info("PipeRuntimeConfigNodeAgent stopped");
@@ -143,4 +161,15 @@ public class PipeConfigNodeRuntimeAgent implements
IService {
PipeConfigNodeAgent.task().stopAllPipesWithCriticalException();
}
}
+
+ /////////////////////////// Periodical Job Executor
///////////////////////////
+
+ public void registerPeriodicalJob(String id, Runnable periodicalJob, long
intervalInSeconds) {
+ pipePeriodicalJobExecutor.register(id, periodicalJob, intervalInSeconds);
+ }
+
+ public void registerPhantomReferenceCleanJob(
+ String id, Runnable periodicalJob, long intervalInSeconds) {
+ pipePeriodicalPhantomReferenceCleaner.register(id, periodicalJob,
intervalInSeconds);
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
index 8e17cb9b515..06a22353da3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
@@ -23,9 +23,12 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
+import
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
+import
org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType;
+import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
@@ -41,9 +44,12 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
+public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent
+ implements ReferenceTrackableEvent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConfigRegionSnapshotEvent.class);
private String snapshotPath;
@@ -243,4 +249,53 @@ public class PipeConfigRegionSnapshotEvent extends
PipeSnapshotEvent {
+ " - "
+ super.coreReportMessage();
}
+
+ /////////////////////////// ReferenceTrackableEvent
///////////////////////////
+
+ @Override
+ protected void trackResource() {
+ PipeConfigNodeResourceManager.ref().trackPipeEventResource(this,
eventResourceBuilder());
+ }
+
+ @Override
+ public PipeEventResource eventResourceBuilder() {
+ return new PipeConfigRegionSnapshotEventResource(
+ this.isReleased,
+ this.referenceCount,
+ this.resourceManager,
+ this.snapshotPath,
+ this.templateFilePath);
+ }
+
+ private static class PipeConfigRegionSnapshotEventResource extends
PipeEventResource {
+
+ private final PipeSnapshotResourceManager resourceManager;
+ private final String snapshotPath;
+ private final String templateFilePath;
+
+ private PipeConfigRegionSnapshotEventResource(
+ final AtomicBoolean isReleased,
+ final AtomicInteger referenceCount,
+ final PipeSnapshotResourceManager resourceManager,
+ final String snapshotPath,
+ final String templateFilePath) {
+ super(isReleased, referenceCount);
+ this.resourceManager = resourceManager;
+ this.snapshotPath = snapshotPath;
+ this.templateFilePath = templateFilePath;
+ }
+
+ @Override
+ protected void finalizeResource() {
+ try {
+ resourceManager.decreaseSnapshotReference(snapshotPath);
+ if (!templateFilePath.isEmpty()) {
+ resourceManager.decreaseSnapshotReference(templateFilePath);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn(
+ String.format("Decrease reference count for snapshot %s error.",
snapshotPath), e);
+ }
+ }
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
index 18b962eab89..cc9652cb6f4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
@@ -43,6 +43,7 @@ public class PipeConfigNodeMetrics implements IMetricSet {
PipeConfigNodeRemainingTimeMetrics.getInstance().bindTo(metricService);
PipeTemporaryMetaInCoordinatorMetrics.getInstance().bindTo(metricService);
PipeConfigNodeReceiverMetrics.getInstance().bindTo(metricService);
+ PipeConfigNodeResourceMetrics.getInstance().bindTo(metricService);
}
@Override
@@ -55,5 +56,6 @@ public class PipeConfigNodeMetrics implements IMetricSet {
PipeConfigNodeRemainingTimeMetrics.getInstance().unbindFrom(metricService);
PipeTemporaryMetaInCoordinatorMetrics.getInstance().unbindFrom(metricService);
PipeConfigNodeReceiverMetrics.getInstance().unbindFrom(metricService);
+ PipeConfigNodeResourceMetrics.getInstance().unbindFrom(metricService);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeResourceMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeResourceMetrics.java
new file mode 100644
index 00000000000..d7b1967fc0f
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeResourceMetrics.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.metric;
+
+import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeConfigNodeResourceMetrics implements IMetricSet {
+
+ //////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
+
+ @Override
+ public void bindTo(final AbstractMetricService metricService) {
+ // phantom reference count
+ metricService.createAutoGauge(
+ Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ PipeConfigNodeResourceManager.ref(),
+ PipePhantomReferenceManager::getPhantomReferenceCount);
+ }
+
+ @Override
+ public void unbindFrom(final AbstractMetricService metricService) {
+ // phantom reference count
+ metricService.remove(MetricType.AUTO_GAUGE,
Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString());
+ }
+
+ //////////////////////////// singleton ////////////////////////////
+
+ private static class PipeConfigNodeResourceMetricsHolder {
+
+ private static final PipeConfigNodeResourceMetrics INSTANCE =
+ new PipeConfigNodeResourceMetrics();
+
+ private PipeConfigNodeResourceMetricsHolder() {
+ // empty constructor
+ }
+ }
+
+ public static PipeConfigNodeResourceMetrics getInstance() {
+ return
PipeConfigNodeResourceMetrics.PipeConfigNodeResourceMetricsHolder.INSTANCE;
+ }
+
+ private PipeConfigNodeResourceMetrics() {
+ // empty constructor
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java
index c13548c1d09..0dc88dcd250 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java
@@ -20,12 +20,15 @@
package org.apache.iotdb.confignode.manager.pipe.resource;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogManager;
+import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
import
org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager;
+import
org.apache.iotdb.confignode.manager.pipe.resource.ref.PipeConfigNodePhantomReferenceManager;
public class PipeConfigNodeResourceManager {
private final PipeSnapshotResourceManager pipeSnapshotResourceManager;
private final PipeLogManager pipeLogManager;
+ private final PipePhantomReferenceManager pipePhantomReferenceManager;
public static PipeSnapshotResourceManager snapshot() {
return PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE
@@ -36,11 +39,16 @@ public class PipeConfigNodeResourceManager {
return
PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE.pipeLogManager;
}
+ public static PipePhantomReferenceManager ref() {
+ return PipeResourceManagerHolder.INSTANCE.pipePhantomReferenceManager;
+ }
+
///////////////////////////// SINGLETON /////////////////////////////
private PipeConfigNodeResourceManager() {
pipeSnapshotResourceManager = new PipeConfigNodeSnapshotResourceManager();
pipeLogManager = new PipeLogManager();
+ pipePhantomReferenceManager = new PipeConfigNodePhantomReferenceManager();
}
private static class PipeResourceManagerHolder {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/ref/PipeConfigNodePhantomReferenceManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/ref/PipeConfigNodePhantomReferenceManager.java
new file mode 100644
index 00000000000..b867163e47e
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/ref/PipeConfigNodePhantomReferenceManager.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.resource.ref;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
+import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
+
+public class PipeConfigNodePhantomReferenceManager extends
PipePhantomReferenceManager {
+
+ public PipeConfigNodePhantomReferenceManager() {
+ super();
+
+ PipeConfigNodeAgent.runtime()
+ .registerPhantomReferenceCleanJob(
+ "PipePhantomReferenceManager#gcHook()",
+ // NOTE: lambda CAN NOT be replaced with method reference
+ () -> super.gcHook(),
+
PipeConfig.getInstance().getPipeEventReferenceEliminateIntervalSeconds());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index cb9484c49e7..d4f11db4ad7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -25,6 +25,8 @@ import
org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
+import
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenceCleaner;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -61,6 +63,9 @@ public class PipeDataNodeRuntimeAgent implements IService {
private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
new PipePeriodicalJobExecutor();
+ private final PipePeriodicalPhantomReferenceCleaner
pipePeriodicalPhantomReferenceCleaner =
+ new PipePeriodicalPhantomReferenceCleaner();
+
//////////////////////////// System Service Interface
////////////////////////////
public synchronized void preparePipeResources(
@@ -86,6 +91,10 @@ public class PipeDataNodeRuntimeAgent implements IService {
PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds());
pipePeriodicalJobExecutor.start();
+ if (PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) {
+ pipePeriodicalPhantomReferenceCleaner.start();
+ }
+
isShutdown.set(false);
}
@@ -230,4 +239,9 @@ public class PipeDataNodeRuntimeAgent implements IService {
public void clearPeriodicalJobExecutor() {
pipePeriodicalJobExecutor.clear();
}
+
+ public void registerPhantomReferenceCleanJob(
+ String id, Runnable periodicalJob, long intervalInSeconds) {
+ pipePeriodicalPhantomReferenceCleaner.register(id, periodicalJob,
intervalInSeconds);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/ReferenceTrackableEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/ReferenceTrackableEvent.java
new file mode 100644
index 00000000000..b3d64d68e9c
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/ReferenceTrackableEvent.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event;
+
+import
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
+
+public interface ReferenceTrackableEvent {
+
+ PipeEventResource eventResourceBuilder();
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
index 0d3b9540f28..dd3097694cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
@@ -23,6 +23,9 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
+import
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
+import
org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager;
+import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
@@ -39,9 +42,12 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-public class PipeSchemaRegionSnapshotEvent extends PipeSnapshotEvent {
+public class PipeSchemaRegionSnapshotEvent extends PipeSnapshotEvent
+ implements ReferenceTrackableEvent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeSchemaRegionSnapshotEvent.class);
private String mTreeSnapshotPath;
private String tagLogSnapshotPath;
@@ -229,4 +235,56 @@ public class PipeSchemaRegionSnapshotEvent extends
PipeSnapshotEvent {
+ " - "
+ super.coreReportMessage();
}
+
+ /////////////////////////// ReferenceTrackableEvent
///////////////////////////
+
+ @Override
+ protected void trackResource() {
+ PipeDataNodeResourceManager.ref().trackPipeEventResource(this,
eventResourceBuilder());
+ }
+
+ @Override
+ public PipeEventResource eventResourceBuilder() {
+ return new PipeSchemaRegionSnapshotEventResource(
+ this.isReleased,
+ this.referenceCount,
+ this.resourceManager,
+ this.mTreeSnapshotPath,
+ this.tagLogSnapshotPath);
+ }
+
+ private static class PipeSchemaRegionSnapshotEventResource extends
PipeEventResource {
+
+ private final PipeSnapshotResourceManager resourceManager;
+ private final String mTreeSnapshotPath;
+ private final String tagLogSnapshotPath;
+
+ private PipeSchemaRegionSnapshotEventResource(
+ final AtomicBoolean isReleased,
+ final AtomicInteger referenceCount,
+ final PipeSnapshotResourceManager resourceManager,
+ final String mTreeSnapshotPath,
+ final String tagLogSnapshotPath) {
+ super(isReleased, referenceCount);
+ this.resourceManager = resourceManager;
+ this.mTreeSnapshotPath = mTreeSnapshotPath;
+ this.tagLogSnapshotPath = tagLogSnapshotPath;
+ }
+
+ @Override
+ protected void finalizeResource() {
+ try {
+ resourceManager.decreaseSnapshotReference(mTreeSnapshotPath);
+ if (!tagLogSnapshotPath.isEmpty()) {
+ resourceManager.decreaseSnapshotReference(tagLogSnapshotPath);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn(
+ String.format(
+ "Decrease reference count for mTree snapshot %s or tLog %s
error.",
+ mTreeSnapshotPath, tagLogSnapshotPath),
+ e);
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 7a3ad4b1a20..faab03970fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -25,7 +25,9 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
import
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -58,7 +60,7 @@ import java.util.function.BiConsumer;
import java.util.stream.Collectors;
public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
- implements TabletInsertionEvent, Accountable {
+ implements TabletInsertionEvent, ReferenceTrackableEvent, Accountable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeInsertNodeTabletInsertionEvent.class);
@@ -439,4 +441,43 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
+ (Objects.nonNull(devicePath) ? PartialPath.estimateSize(devicePath)
: 0)
+ (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0);
}
+
+ /////////////////////////// ReferenceTrackableEvent
///////////////////////////
+
+ @Override
+ protected void trackResource() {
+ PipeDataNodeResourceManager.ref().trackPipeEventResource(this,
eventResourceBuilder());
+ }
+
+ @Override
+ public PipeEventResource eventResourceBuilder() {
+ return new PipeInsertNodeTabletInsertionEventResource(
+ this.isReleased, this.referenceCount, this.walEntryHandler);
+ }
+
+ private static class PipeInsertNodeTabletInsertionEventResource extends
PipeEventResource {
+
+ private final WALEntryHandler walEntryHandler;
+
+ private PipeInsertNodeTabletInsertionEventResource(
+ final AtomicBoolean isReleased,
+ final AtomicInteger referenceCount,
+ final WALEntryHandler walEntryHandler) {
+ super(isReleased, referenceCount);
+ this.walEntryHandler = walEntryHandler;
+ }
+
+ @Override
+ protected void finalizeResource() {
+ try {
+ PipeDataNodeResourceManager.wal().unpin(walEntryHandler);
+ // no need to release the containers' memory because it has already
been GCed
+ } catch (final Exception e) {
+ LOGGER.warn(
+ String.format(
+ "Decrease reference count for memTable %d error.",
walEntryHandler.getMemTableId()),
+ e);
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 825bb543843..922fd3eac74 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -24,7 +24,9 @@ import
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -38,10 +40,12 @@ import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.record.Tablet;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
-public class PipeRawTabletInsertionEvent extends EnrichedEvent implements
TabletInsertionEvent {
-
+public class PipeRawTabletInsertionEvent extends EnrichedEvent
+ implements TabletInsertionEvent, ReferenceTrackableEvent {
// For better calculation
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(PipeRawTabletInsertionEvent.class);
@@ -318,4 +322,35 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
+ " - "
+ super.coreReportMessage();
}
+
+ /////////////////////////// ReferenceTrackableEvent
///////////////////////////
+
+ @Override
+ protected void trackResource() {
+ PipeDataNodeResourceManager.ref().trackPipeEventResource(this,
eventResourceBuilder());
+ }
+
+ @Override
+ public PipeEventResource eventResourceBuilder() {
+ return new PipeRawTabletInsertionEventResource(
+ this.isReleased, this.referenceCount, this.allocatedMemoryBlock);
+ }
+
+ private static class PipeRawTabletInsertionEventResource extends
PipeEventResource {
+
+ private final PipeTabletMemoryBlock allocatedMemoryBlock;
+
+ private PipeRawTabletInsertionEventResource(
+ final AtomicBoolean isReleased,
+ final AtomicInteger referenceCount,
+ final PipeTabletMemoryBlock allocatedMemoryBlock) {
+ super(isReleased, referenceCount);
+ this.allocatedMemoryBlock = allocatedMemoryBlock;
+ }
+
+ @Override
+ protected void finalizeResource() {
+ allocatedMemoryBlock.close();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index ca8b3178560..d29afdbee83 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -25,6 +25,8 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
+import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider;
@@ -52,8 +54,10 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
-public class PipeTsFileInsertionEvent extends EnrichedEvent implements
TsFileInsertionEvent {
+public class PipeTsFileInsertionEvent extends EnrichedEvent
+ implements TsFileInsertionEvent, ReferenceTrackableEvent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);
@@ -546,4 +550,49 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
+ " - "
+ super.coreReportMessage();
}
+
+ /////////////////////////// ReferenceTrackableEvent
///////////////////////////
+
+ @Override
+ public void trackResource() {
+ PipeDataNodeResourceManager.ref().trackPipeEventResource(this,
eventResourceBuilder());
+ }
+
+ @Override
+ public PipeEventResource eventResourceBuilder() {
+ return new PipeTsFileInsertionEventResource(
+ this.isReleased, this.referenceCount, this.tsFile, this.isWithMod,
this.modFile);
+ }
+
+ private static class PipeTsFileInsertionEventResource extends
PipeEventResource {
+
+ private final File tsFile;
+ private final boolean isWithMod;
+ private final File modFile;
+
+ private PipeTsFileInsertionEventResource(
+ final AtomicBoolean isReleased,
+ final AtomicInteger referenceCount,
+ final File tsFile,
+ final boolean isWithMod,
+ final File modFile) {
+ super(isReleased, referenceCount);
+ this.tsFile = tsFile;
+ this.isWithMod = isWithMod;
+ this.modFile = modFile;
+ }
+
+ @Override
+ protected void finalizeResource() {
+ try {
+ PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile);
+ if (isWithMod) {
+ PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn(
+ String.format("Decrease reference count for TsFile %s error.",
tsFile.getPath()), e);
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
index 04fb647a825..854ff9ffb10 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.metric;
+import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -66,6 +67,12 @@ public class PipeResourceMetrics implements IMetricSet {
MetricLevel.IMPORTANT,
PipeDataNodeResourceManager.tsfile(),
PipeTsFileResourceManager::getLinkedTsfileCount);
+ // phantom reference count
+ metricService.createAutoGauge(
+ Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ PipeDataNodeResourceManager.ref(),
+ PipePhantomReferenceManager::getPhantomReferenceCount);
}
@Override
@@ -78,6 +85,8 @@ public class PipeResourceMetrics implements IMetricSet {
// resource reference count
metricService.remove(MetricType.AUTO_GAUGE,
Metric.PIPE_PINNED_MEMTABLE_COUNT.toString());
metricService.remove(MetricType.AUTO_GAUGE,
Metric.PIPE_LINKED_TSFILE_COUNT.toString());
+ // phantom reference count
+ metricService.remove(MetricType.AUTO_GAUGE,
Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString());
}
//////////////////////////// singleton ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java
index 74afd58681f..573106e45c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.db.pipe.resource;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogManager;
+import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
import
org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
+import
org.apache.iotdb.db.pipe.resource.ref.PipeDataNodePhantomReferenceManager;
import
org.apache.iotdb.db.pipe.resource.snapshot.PipeDataNodeSnapshotResourceManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
@@ -38,6 +40,7 @@ public class PipeDataNodeResourceManager {
private final PipeSnapshotResourceManager pipeSnapshotResourceManager;
private final PipeMemoryManager pipeMemoryManager;
private final PipeLogManager pipeLogManager;
+ private final PipePhantomReferenceManager pipePhantomReferenceManager;
public static PipeTsFileResourceManager tsfile() {
return PipeResourceManagerHolder.INSTANCE.pipeTsFileResourceManager;
@@ -69,6 +72,10 @@ public class PipeDataNodeResourceManager {
return PipeResourceManagerHolder.INSTANCE.pipeLogManager;
}
+ public static PipePhantomReferenceManager ref() {
+ return PipeResourceManagerHolder.INSTANCE.pipePhantomReferenceManager;
+ }
+
///////////////////////////// SINGLETON /////////////////////////////
private PipeDataNodeResourceManager() {
@@ -77,6 +84,7 @@ public class PipeDataNodeResourceManager {
pipeSnapshotResourceManager = new PipeDataNodeSnapshotResourceManager();
pipeMemoryManager = new PipeMemoryManager();
pipeLogManager = new PipeLogManager();
+ pipePhantomReferenceManager = new PipeDataNodePhantomReferenceManager();
}
private static class PipeResourceManagerHolder {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/ref/PipeDataNodePhantomReferenceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/ref/PipeDataNodePhantomReferenceManager.java
new file mode 100644
index 00000000000..991cfebe8fa
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/ref/PipeDataNodePhantomReferenceManager.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.ref;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+
+public class PipeDataNodePhantomReferenceManager extends
PipePhantomReferenceManager {
+
+ public PipeDataNodePhantomReferenceManager() {
+ super();
+
+ PipeDataNodeAgent.runtime()
+ .registerPhantomReferenceCleanJob(
+ "PipePhantomReferenceManager#gcHook()",
+ // NOTE: lambda CAN NOT be replaced with method reference
+ () -> super.gcHook(),
+
PipeConfig.getInstance().getPipeEventReferenceEliminateIntervalSeconds());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
index 02a634c9c19..8b003621687 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.subscription.event.cache;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
@@ -98,7 +98,7 @@ public class SubscriptionPollResponseCache {
final long maxMemorySizeInBytes =
(long)
(PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
- *
PipeConfig.getInstance().getSubscriptionCacheMemoryUsagePercentage());
+ *
SubscriptionConfig.getInstance().getSubscriptionCacheMemoryUsagePercentage());
// properties required by pipe memory control framework
final PipeMemoryBlock allocatedMemoryBlock =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 07057bd012d..4e65b254d39 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -136,6 +136,8 @@ public enum ThreadName {
PIPE_RUNTIME_HEARTBEAT("Pipe-Runtime-Heartbeat"),
PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"),
PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR("Pipe-Runtime-Periodical-Job-Executor"),
+ PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER(
+ "Pipe-Runtime-Periodical-Phantom-Reference-Cleaner"),
PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
SUBSCRIPTION_EXECUTOR_POOL("Subscription-Executor-Pool"),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index ee2deab85ab..2d1585d6525 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -281,6 +281,9 @@ public class CommonConfig {
private float subscriptionCacheMemoryUsagePercentage = 0.2F;
+ private boolean pipeEventReferenceTrackingEnabled = true;
+ private long pipeEventReferenceEliminateIntervalSeconds = 10;
+
private int subscriptionSubtaskExecutorMaxThreadNum =
Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
private int subscriptionPrefetchTabletBatchMaxDelayInMs = 1000; // 1s
@@ -1230,6 +1233,23 @@ public class CommonConfig {
this.twoStageAggregateSenderEndPointsCacheInMs =
twoStageAggregateSenderEndPointsCacheInMs;
}
+ public boolean getPipeEventReferenceTrackingEnabled() {
+ return pipeEventReferenceTrackingEnabled;
+ }
+
+ public void setPipeEventReferenceTrackingEnabled(boolean
pipeEventReferenceTrackingEnabled) {
+ this.pipeEventReferenceTrackingEnabled = pipeEventReferenceTrackingEnabled;
+ }
+
+ public long getPipeEventReferenceEliminateIntervalSeconds() {
+ return pipeEventReferenceEliminateIntervalSeconds;
+ }
+
+ public void setPipeEventReferenceEliminateIntervalSeconds(
+ long pipeEventReferenceEliminateIntervalSeconds) {
+ this.pipeEventReferenceEliminateIntervalSeconds =
pipeEventReferenceEliminateIntervalSeconds;
+ }
+
public float getSubscriptionCacheMemoryUsagePercentage() {
return subscriptionCacheMemoryUsagePercentage;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index fdd1b46efdd..e6ee4d5c464 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -621,6 +621,17 @@ public class CommonDescriptor {
properties.getProperty(
"two_stage_aggregate_sender_end_points_cache_in_ms",
String.valueOf(config.getTwoStageAggregateSenderEndPointsCacheInMs()))));
+
+ config.setPipeEventReferenceTrackingEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_event_reference_tracking_enabled",
+
String.valueOf(config.getPipeEventReferenceTrackingEnabled()))));
+ config.setPipeEventReferenceEliminateIntervalSeconds(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_event_reference_eliminate_interval_seconds",
+
String.valueOf(config.getPipeEventReferenceEliminateIntervalSeconds()))));
}
private void loadSubscriptionProps(Properties properties) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/AbstractPipePeriodicalJobExecutor.java
similarity index 69%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/AbstractPipePeriodicalJobExecutor.java
index fb3c87d7ef3..c6e24b5f475 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/AbstractPipePeriodicalJobExecutor.java
@@ -17,15 +17,11 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent.runtime;
+package org.apache.iotdb.commons.pipe.agent.runtime;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.WrappedRunnable;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.service.DataNode;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -38,19 +34,16 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
- * Single thread to execute pipe periodical jobs on {@link DataNode}. This is
for limiting the
- * thread num on the {@link DataNode} instance.
+ * Single thread to execute pipe periodical jobs on DataNode or ConfigNode.
This is for limiting the
+ * thread num on the DataNode or ConfigNode instance.
*/
-public class PipePeriodicalJobExecutor {
+public abstract class AbstractPipePeriodicalJobExecutor {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipePeriodicalJobExecutor.class);
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(AbstractPipePeriodicalJobExecutor.class);
- private static final ScheduledExecutorService PERIODICAL_JOB_EXECUTOR =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
- ThreadName.PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR.getName());
-
- private static final long MIN_INTERVAL_SECONDS =
-
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
+ private final ScheduledExecutorService executorService;
+ private final long minIntervalSeconds;
private long rounds;
private Future<?> executorFuture;
@@ -58,6 +51,12 @@ public class PipePeriodicalJobExecutor {
// <Periodical job, Interval in rounds>
private final List<Pair<WrappedRunnable, Long>> periodicalJobs = new
CopyOnWriteArrayList<>();
+ public AbstractPipePeriodicalJobExecutor(
+ final ScheduledExecutorService executorService, final long
minIntervalSeconds) {
+ this.executorService = executorService;
+ this.minIntervalSeconds = minIntervalSeconds;
+ }
+
public void register(String id, Runnable periodicalJob, long
intervalInSeconds) {
periodicalJobs.add(
new Pair<>(
@@ -71,11 +70,11 @@ public class PipePeriodicalJobExecutor {
}
}
},
- Math.max(intervalInSeconds / MIN_INTERVAL_SECONDS, 1)));
+ Math.max(intervalInSeconds / minIntervalSeconds, 1)));
LOGGER.info(
"Pipe periodical job {} is registered successfully. Interval: {}
seconds.",
id,
- Math.max(intervalInSeconds / MIN_INTERVAL_SECONDS, 1) *
MIN_INTERVAL_SECONDS);
+ Math.max(intervalInSeconds / minIntervalSeconds, 1) *
minIntervalSeconds);
}
public synchronized void start() {
@@ -84,16 +83,16 @@ public class PipePeriodicalJobExecutor {
executorFuture =
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- PERIODICAL_JOB_EXECUTOR,
+ executorService,
this::execute,
- MIN_INTERVAL_SECONDS,
- MIN_INTERVAL_SECONDS,
+ minIntervalSeconds,
+ minIntervalSeconds,
TimeUnit.SECONDS);
LOGGER.info("Pipe periodical job executor is started successfully.");
}
}
- private void execute() {
+ protected void execute() {
++rounds;
for (final Pair<WrappedRunnable, Long> periodicalJob : periodicalJobs) {
@@ -116,8 +115,4 @@ public class PipePeriodicalJobExecutor {
periodicalJobs.clear();
LOGGER.info("All pipe periodical jobs are cleared successfully.");
}
-
- public static long getMinIntervalSeconds() {
- return MIN_INTERVAL_SECONDS;
- }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
new file mode 100644
index 00000000000..3226b3947f0
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.runtime;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+
+/**
+ * The shortest scheduling cycle for these jobs is {@link
+ * PipeConfig#getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()},
suitable for jobs that are
+ * NOT time-critical.
+ */
+public class PipePeriodicalJobExecutor extends
AbstractPipePeriodicalJobExecutor {
+
+ public PipePeriodicalJobExecutor() {
+ super(
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR.getName()),
+
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalPhantomReferenceCleaner.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalPhantomReferenceCleaner.java
new file mode 100644
index 00000000000..32f1549917c
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalPhantomReferenceCleaner.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.runtime;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+
+/** The shortest scheduling cycle for these jobs is 1, suitable for jobs that
are time-critical. */
+public class PipePeriodicalPhantomReferenceCleaner extends
AbstractPipePeriodicalJobExecutor {
+
+ public PipePeriodicalPhantomReferenceCleaner() {
+ super(
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+
ThreadName.PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER.getName()),
+ 1L);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 9cb99a226ba..59ac6db855c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -320,10 +320,14 @@ public class PipeConfig {
return COMMON_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs();
}
- /////////////////////////////// Subscription ///////////////////////////////
+ /////////////////////////////// Ref ///////////////////////////////
- public float getSubscriptionCacheMemoryUsagePercentage() {
- return COMMON_CONFIG.getSubscriptionCacheMemoryUsagePercentage();
+ public boolean getPipeEventReferenceTrackingEnabled() {
+ return COMMON_CONFIG.getPipeEventReferenceTrackingEnabled();
+ }
+
+ public long getPipeEventReferenceEliminateIntervalSeconds() {
+ return COMMON_CONFIG.getPipeEventReferenceEliminateIntervalSeconds();
}
/////////////////////////////// Utils ///////////////////////////////
@@ -469,8 +473,10 @@ public class PipeConfig {
"TwoStageAggregateSenderEndPointsCacheInMs: {}",
getTwoStageAggregateSenderEndPointsCacheInMs());
+ LOGGER.info("PipeEventReferenceTrackingEnabled: {}",
getPipeEventReferenceTrackingEnabled());
LOGGER.info(
- "SubscriptionCacheMemoryUsagePercentage: {}",
getSubscriptionCacheMemoryUsagePercentage());
+ "PipeEventReferenceEliminateIntervalSeconds: {}",
+ getPipeEventReferenceEliminateIntervalSeconds());
}
/////////////////////////////// Singleton ///////////////////////////////
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index c04653adaa3..a9dae771967 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.pipe.api.event.Event;
@@ -98,6 +99,10 @@ public abstract class EnrichedEvent implements Event {
});
}
+ protected void trackResource() {
+ // do nothing by default
+ }
+
/**
* Increase the {@link EnrichedEvent#referenceCount} of this event. When the
{@link
* EnrichedEvent#referenceCount} is positive, the data in the resource of
this {@link
@@ -125,7 +130,10 @@ public abstract class EnrichedEvent implements Event {
}
if (isSuccessful) {
- referenceCount.incrementAndGet();
+ if (referenceCount.incrementAndGet() == 1
+ && PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) {
+ trackResource();
+ }
} else {
LOGGER.warn(
"increase reference count failed, EnrichedEvent: {}, stack trace:
{}",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java
new file mode 100644
index 00000000000..d95616e685d
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.resource.ref;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.ref.PhantomReference;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class PipePhantomReferenceManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipePhantomReferenceManager.class);
+
+ private static final Set<PipeEventPhantomReference>
PIPE_EVENT_PHANTOM_REFERENCES =
+ ConcurrentHashMap.newKeySet();
+
+ private static final ReferenceQueue<EnrichedEvent> REFERENCE_QUEUE = new
ReferenceQueue<>();
+
+ private volatile long lastPhantomReferenceCount = -1;
+
+ public PipePhantomReferenceManager() {
+ // Do nothing now.
+ }
+
+ public int getPhantomReferenceCount() {
+ return PIPE_EVENT_PHANTOM_REFERENCES.size();
+ }
+
+ protected void gcHook() {
+ if (!PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) {
+ return;
+ }
+
+ final long startTime = System.currentTimeMillis();
+
+ // limit control to avoid infinite execution
+ final int maxCount = getPhantomReferenceCount();
+ int count = 0;
+
+ Reference<? extends EnrichedEvent> reference;
+ try {
+ while (count < maxCount && ((reference = REFERENCE_QUEUE.remove(500)) !=
null)) {
+ finalizeResource((PipeEventPhantomReference) reference);
+ count++;
+ }
+ } catch (final InterruptedException e) {
+ // Finalize remaining references.
+ while (count < maxCount && ((reference = REFERENCE_QUEUE.poll()) !=
null)) {
+ finalizeResource((PipeEventPhantomReference) reference);
+ count++;
+ }
+ } catch (final Exception e) {
+ // Nowhere to really log this.
+ }
+
+ if (count != 0) {
+ LOGGER.info(
+ "Clean {} pipe phantom reference(s) successfully within {} ms,
remaining reference count: {}",
+ count,
+ System.currentTimeMillis() - startTime,
+ getPhantomReferenceCount());
+ } else {
+ final long currentPhantomReferenceCount = getPhantomReferenceCount();
+ if (currentPhantomReferenceCount != lastPhantomReferenceCount) {
+ if (lastPhantomReferenceCount != -1) {
+ LOGGER.info("Remaining pipe phantom reference count: {}",
currentPhantomReferenceCount);
+ }
+ lastPhantomReferenceCount = currentPhantomReferenceCount;
+ }
+ }
+ }
+
+ private void finalizeResource(final PipeEventPhantomReference reference) {
+ try {
+ reference.finalizeResources();
+ reference.clear();
+ } finally {
+ PIPE_EVENT_PHANTOM_REFERENCES.remove(reference);
+ }
+ }
+
+ private static class PipeEventPhantomReference extends
PhantomReference<EnrichedEvent> {
+
+ private final String holderMessage;
+ private PipeEventResource resource;
+
+ private PipeEventPhantomReference(
+ final EnrichedEvent event,
+ final PipeEventResource resource,
+ final ReferenceQueue<? super EnrichedEvent> queue) {
+ super(event, queue);
+ this.holderMessage = event.getClass().getSimpleName();
+ this.resource = resource;
+ }
+
+ private void finalizeResources() {
+ if (this.resource != null) {
+ try {
+ this.resource.clearReferenceCount(holderMessage);
+ } finally {
+ this.resource = null;
+ }
+ }
+ }
+ }
+
+ ///////////////////// APIs provided for EnrichedEvent /////////////////////
+
+ public void trackPipeEventResource(final EnrichedEvent event, final
PipeEventResource resource) {
+ final PipeEventPhantomReference reference =
+ new PipeEventPhantomReference(event, resource, REFERENCE_QUEUE);
+ PIPE_EVENT_PHANTOM_REFERENCES.add(reference);
+ }
+
+ public abstract static class PipeEventResource {
+
+ private final AtomicBoolean isReleased;
+ private final AtomicInteger referenceCount;
+
+ protected PipeEventResource(
+ final AtomicBoolean isReleased, final AtomicInteger referenceCount) {
+ this.isReleased = isReleased;
+ this.referenceCount = referenceCount;
+ }
+
+ private void clearReferenceCount(final String holderMessage) {
+ if (isReleased.get()) {
+ return;
+ }
+
+ if (referenceCount.get() >= 1) {
+ LOGGER.error("PIPE EVENT RESOURCE LEAK DETECTED: {}", holderMessage);
+ finalizeResource();
+ }
+
+ referenceCount.set(0);
+ isReleased.set(true);
+ }
+
+ protected abstract void finalizeResource();
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 49344880a93..04757610064 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -156,6 +156,7 @@ public enum Metric {
PIPE_MEM("pipe_mem"),
PIPE_PINNED_MEMTABLE_COUNT("pipe_pinned_memtable_count"),
PIPE_LINKED_TSFILE_COUNT("pipe_linked_tsfile_count"),
+ PIPE_PHANTOM_REFERENCE_COUNT("pipe_phantom_reference_count"),
PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE("pipe_async_connector_retry_event_queue_size"),
PIPE_EVENT_COMMIT_QUEUE_SIZE("pipe_event_commit_queue_size"),
PIPE_PROCEDURE("pipe_procedure"),