This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a5b668ba67c Pipe: introduce global cache for TsFile metadatas and 
support TTL of Pipe TsFile (#11240)
a5b668ba67c is described below

commit a5b668ba67c665a2efe7faaa0b5519460ab8192d
Author: Zikun Ma <[email protected]>
AuthorDate: Fri Nov 24 01:20:29 2023 +0800

    Pipe: introduce global cache for TsFile metadatas and support TTL of Pipe 
TsFile (#11240)
    
    Currently, if a TsFile is processed by N pipes, its metadata will be 
constructed N times, the overhead will be high when N is very big. This commit 
added a global cache to reduce repeated creation and redundant IO.
    
    This commit also introduces TTL of TsFiles pinned by pipe engine, just like 
WALs.
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../agent/runtime/PipePeriodicalJobExecutor.java   |  65 +++---
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |  22 +++
 .../tsfile/TsFileInsertionDataContainer.java       |  50 +++--
 .../pipe/extractor/IoTDBDataRegionExtractor.java   |   4 +-
 .../realtime/PipeRealtimeDataRegionExtractor.java  |   5 +-
 .../listener/PipeInsertionDataNodeListener.java    |  10 +-
 .../db/pipe/resource/memory/PipeMemoryBlock.java   |   2 +-
 .../db/pipe/resource/memory/PipeMemoryManager.java |  37 ++++
 .../pipe/resource/memory/PipeMemoryWeighUtil.java  |  58 ++++++
 .../pipe/resource/tsfile/PipeTsFileResource.java   | 217 +++++++++++++++++++++
 .../resource/tsfile/PipeTsFileResourceManager.java | 210 ++++++++++++++++----
 .../db/pipe/resource/wal/PipeWALResource.java      |   4 +-
 .../pipe/resource/wal/PipeWALResourceManager.java  |  67 +++----
 .../resource/PipeTsFileResourceManagerTest.java    |  18 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   2 -
 .../apache/iotdb/commons/conf/CommonConfig.java    |   2 +-
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |   4 +
 17 files changed, 648 insertions(+), 129 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
index df9240cabe4..c3770201371 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
@@ -21,14 +21,17 @@ package org.apache.iotdb.db.pipe.agent.runtime;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import 
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -45,34 +48,44 @@ public class PipePeriodicalJobExecutor {
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
           ThreadName.PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR.getName());
 
-  private static final long CRON_EVENT_INJECTOR_INTERVAL_SECONDS =
+  private static final long MIN_INTERVAL_SECONDS =
       
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
-  private long cronEventInjectRoundsInterval;
-
-  private static final long MEMORY_EXPANDER_INTERVAL_SECONDS =
-      PipeConfig.getInstance().getPipeMemoryExpanderIntervalSeconds();
-  private long memoryExpandRoundsInterval;
-
-  // Currently we use the CRON_EVENT_INJECTOR_INTERVAL_SECONDS as minimum 
interval
-  private static final long EXECUTOR_INTERVAL_SECONDS = 
CRON_EVENT_INJECTOR_INTERVAL_SECONDS;
   private long rounds;
-
   private Future<?> executorFuture;
 
+  // <Periodical job, Interval in rounds>
+  private static final List<Pair<WrappedRunnable, Long>> periodicalJobs = new 
ArrayList<>();
+
+  public synchronized void register(String id, Runnable periodicalJob, long 
intervalInSeconds) {
+    periodicalJobs.add(
+        new Pair<>(
+            new WrappedRunnable() {
+              @Override
+              public void runMayThrow() {
+                try {
+                  periodicalJob.run();
+                } catch (Exception e) {
+                  LOGGER.warn("Periodical job {} failed.", id, e);
+                }
+              }
+            },
+            Math.max(intervalInSeconds / MIN_INTERVAL_SECONDS, 1)));
+    LOGGER.info(
+        "Pipe periodical job {} is registered successfully. Interval: {} 
seconds.",
+        id,
+        Math.max(intervalInSeconds / MIN_INTERVAL_SECONDS, 1) * 
MIN_INTERVAL_SECONDS);
+  }
+
   public synchronized void start() {
     if (executorFuture == null) {
       rounds = 0;
-      cronEventInjectRoundsInterval =
-          Math.max(CRON_EVENT_INJECTOR_INTERVAL_SECONDS / 
EXECUTOR_INTERVAL_SECONDS, 1);
-      memoryExpandRoundsInterval =
-          Math.max(MEMORY_EXPANDER_INTERVAL_SECONDS / 
EXECUTOR_INTERVAL_SECONDS, 1);
 
       executorFuture =
           ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
               PERIODICAL_JOB_EXECUTOR,
               this::execute,
-              EXECUTOR_INTERVAL_SECONDS,
-              EXECUTOR_INTERVAL_SECONDS,
+              MIN_INTERVAL_SECONDS,
+              MIN_INTERVAL_SECONDS,
               TimeUnit.SECONDS);
       LOGGER.info("Pipe periodical job executor is started successfully.");
     }
@@ -81,12 +94,10 @@ public class PipePeriodicalJobExecutor {
   private synchronized void execute() {
     ++rounds;
 
-    if (rounds % cronEventInjectRoundsInterval == 0) {
-      PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(false);
-    }
-
-    if (rounds % memoryExpandRoundsInterval == 0) {
-      PipeResourceManager.memory().tryExpandAll();
+    for (final Pair<WrappedRunnable, Long> periodicalJob : periodicalJobs) {
+      if (rounds % periodicalJob.right == 0) {
+        periodicalJob.left.run();
+      }
     }
   }
 
@@ -97,4 +108,10 @@ public class PipePeriodicalJobExecutor {
       LOGGER.info("Pipe periodical job executor is stopped successfully.");
     }
   }
+
+  @TestOnly
+  public synchronized void clear() {
+    periodicalJobs.clear();
+    LOGGER.info("All pipe periodical jobs are cleared successfully.");
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index e9c6a50920f..9d85ec0d5ee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.resource.PipeHardlinkFileDirStartupCleaner;
@@ -134,4 +135,25 @@ public class PipeRuntimeAgent implements IService {
       PipeAgent.task().stopAllPipesWithCriticalException();
     }
   }
+
+  /////////////////////////// Periodical Job Executor 
///////////////////////////
+
+  public void registerPeriodicalJob(String id, Runnable periodicalJob, long 
intervalInSeconds) {
+    pipePeriodicalJobExecutor.register(id, periodicalJob, intervalInSeconds);
+  }
+
+  @TestOnly
+  public void startPeriodicalJobExecutor() {
+    pipePeriodicalJobExecutor.start();
+  }
+
+  @TestOnly
+  public void stopPeriodicalJobExecutor() {
+    pipePeriodicalJobExecutor.stop();
+  }
+
+  @TestOnly
+  public void clearPeriodicalJobExecutor() {
+    pipePeriodicalJobExecutor.clear();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 10b746b3f36..72faf0d83b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeighUtil;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -95,28 +97,48 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
     this.sourceEvent = sourceEvent;
 
     try {
-      allocatedMemoryBlock =
-          PipeResourceManager.memory()
-              .forceAllocate(
-                  
PipeConfig.getInstance().getPipeMemoryAllocateForTsFileSequenceReaderInBytes());
+      final PipeTsFileResourceManager tsFileResourceManager = 
PipeResourceManager.tsfile();
+      final Map<String, List<String>> deviceMeasurementsMap;
 
-      tsFileSequenceReader = new 
TsFileSequenceReader(tsFile.getAbsolutePath(), true, true);
+      // TsFileReader is not thread-safe, so we need to create it here and 
close it later.
+      long memoryRequiredInBytes =
+          
PipeConfig.getInstance().getPipeMemoryAllocateForTsFileSequenceReaderInBytes();
+      tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true, 
true);
       tsFileReader = new TsFileReader(tsFileSequenceReader);
 
-      deviceMeasurementsMapIterator = 
filterDeviceMeasurementsMapByPattern().entrySet().iterator();
-      deviceIsAlignedMap = readDeviceIsAlignedMap();
-      measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
+      if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) {
+        // These read-only objects can be found in cache.
+        deviceIsAlignedMap = 
tsFileResourceManager.getDeviceIsAlignedMapFromCache(tsFile);
+        measurementDataTypeMap = 
tsFileResourceManager.getMeasurementDataTypeMapFromCache(tsFile);
+        deviceMeasurementsMap = 
tsFileResourceManager.getDeviceMeasurementsMapFromCache(tsFile);
+      } else {
+        // We need to create these objects here and remove them later.
+        deviceIsAlignedMap = readDeviceIsAlignedMap();
+        memoryRequiredInBytes += 
PipeMemoryWeighUtil.memoryOfStr2Bool(deviceIsAlignedMap);
+
+        measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
+        memoryRequiredInBytes += 
PipeMemoryWeighUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
+
+        deviceMeasurementsMap = 
tsFileSequenceReader.getDeviceMeasurementsMap();
+        memoryRequiredInBytes += 
PipeMemoryWeighUtil.memoryOfStr2StrList(deviceMeasurementsMap);
+      }
+      allocatedMemoryBlock = 
PipeResourceManager.memory().forceAllocate(memoryRequiredInBytes);
+
+      deviceMeasurementsMapIterator =
+          
filterDeviceMeasurementsMapByPattern(deviceMeasurementsMap).entrySet().iterator();
+
+      // No longer need this. Help GC.
+      tsFileSequenceReader.clearCachedDeviceMetadata();
     } catch (Exception e) {
       close();
       throw e;
     }
   }
 
-  private Map<String, List<String>> filterDeviceMeasurementsMapByPattern() 
throws IOException {
+  private Map<String, List<String>> filterDeviceMeasurementsMapByPattern(
+      Map<String, List<String>> originalDeviceMeasurementsMap) {
     final Map<String, List<String>> filteredDeviceMeasurementsMap = new 
HashMap<>();
-
-    for (Map.Entry<String, List<String>> entry :
-        tsFileSequenceReader.getDeviceMeasurementsMap().entrySet()) {
+    for (Map.Entry<String, List<String>> entry : 
originalDeviceMeasurementsMap.entrySet()) {
       final String deviceId = entry.getKey();
 
       // case 1: for example, pattern is root.a.b or pattern is null and 
device is root.a.b.c
@@ -250,6 +272,8 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
       LOGGER.warn("Failed to close TsFileSequenceReader", e);
     }
 
-    allocatedMemoryBlock.close();
+    if (allocatedMemoryBlock != null) {
+      allocatedMemoryBlock.close();
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index b5c6aea4f15..c1a1b440579 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -282,7 +282,9 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
   public void close() throws Exception {
     historicalExtractor.close();
     realtimeExtractor.close();
-    PipeExtractorMetrics.getInstance().deregister(taskID);
+    if (Objects.nonNull(taskID)) {
+      PipeExtractorMetrics.getInstance().deregister(taskID);
+    }
   }
 
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index fb4e6e521c9..d7c80082bfb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.pipe.api.event.Event;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
@@ -119,7 +120,9 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
 
   @Override
   public void close() throws Exception {
-    
PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId, 
this);
+    if (Objects.nonNull(dataRegionId)) {
+      
PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId, 
this);
+    }
 
     synchronized (isClosed) {
       clearPendingQueue();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index f65cd12a6ef..a2e2b76b8bf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.extractor.realtime.listener;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.assigner.PipeDataRegionAssigner;
@@ -144,7 +146,13 @@ public class PipeInsertionDataNodeListener {
 
   /////////////////////////////// singleton ///////////////////////////////
 
-  private PipeInsertionDataNodeListener() {}
+  private PipeInsertionDataNodeListener() {
+    PipeAgent.runtime()
+        .registerPeriodicalJob(
+            "PipeInsertionDataNodeListener#listenToHeartbeat(false)",
+            () -> listenToHeartbeat(false),
+            
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
+  }
 
   public static PipeInsertionDataNodeListener getInstance() {
     return PipeChangeDataCaptureListenerHolder.INSTANCE;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index 7523965aa91..642683fb703 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -165,7 +165,7 @@ public class PipeMemoryBlock implements AutoCloseable {
         if (lock.tryLock(50, TimeUnit.MICROSECONDS)) {
           try {
             pipeMemoryManager.release(this);
-            return;
+            break;
           } finally {
             lock.unlock();
           }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 7d8ead94814..ad4e1951a1e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.resource.memory;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.write.record.Tablet;
@@ -58,6 +59,14 @@ public class PipeMemoryManager {
 
   private final Set<PipeMemoryBlock> allocatedBlocks = new HashSet<>();
 
+  public PipeMemoryManager() {
+    PipeAgent.runtime()
+        .registerPeriodicalJob(
+            "PipeMemoryManager#tryExpandAll()",
+            this::tryExpandAll,
+            PipeConfig.getInstance().getPipeMemoryExpanderIntervalSeconds());
+  }
+
   public synchronized PipeMemoryBlock forceAllocate(long sizeInBytes)
       throws PipeRuntimeOutOfMemoryCriticalException {
     if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
@@ -94,6 +103,34 @@ public class PipeMemoryManager {
     return forceAllocate(calculateTabletSizeInBytes(tablet));
   }
 
+  /**
+   * Allocate a memory block for pipe only if memory already used is less than 
specified threshold.
+   *
+   * @param sizeInBytes size of memory needed to allocate
+   * @param usedThreshold proportion of memory used, ranged from 0.0 to 1.0
+   * @return {@code null} if the proportion of memory already used exceeds 
{@code usedThreshold}.
+   *     Will return a memory block otherwise.
+   */
+  public synchronized PipeMemoryBlock forceAllocateIfSufficient(
+      long sizeInBytes, float usedThreshold) {
+    if (usedThreshold < 0.0f || usedThreshold > 1.0f) {
+      return null;
+    }
+    if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes
+        && (float) usedMemorySizeInBytes / TOTAL_MEMORY_SIZE_IN_BYTES < 
usedThreshold) {
+      return forceAllocate(sizeInBytes);
+    } else {
+      long memoryToShrink =
+          Math.max(
+              usedMemorySizeInBytes - (long) (TOTAL_MEMORY_SIZE_IN_BYTES * 
usedThreshold),
+              sizeInBytes);
+      if (tryShrink4Allocate(memoryToShrink)) {
+        return forceAllocate(sizeInBytes);
+      }
+    }
+    return null;
+  }
+
   private long calculateTabletSizeInBytes(Tablet tablet) {
     long totalSizeInBytes = 0;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeighUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeighUtil.java
new file mode 100644
index 00000000000..2445bc510e9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeighUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.memory;
+
+import org.apache.iotdb.db.utils.MemUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.List;
+import java.util.Map;
+
+public class PipeMemoryWeighUtil {
+  /** Estimates memory usage of a {@link Map}<{@link String}, {@link 
Boolean}>. */
+  public static long memoryOfStr2Bool(Map<String, Boolean> map) {
+    long usageInBytes = 0L;
+    for (Map.Entry<String, Boolean> entry : map.entrySet()) {
+      usageInBytes = usageInBytes + MemUtils.getStringMem(entry.getKey()) + 1L;
+    }
+    return usageInBytes + 16L; // add the overhead of map
+  }
+
+  /** Estimates memory usage of a {@link Map}<{@link String}, {@link 
TSDataType}>. */
+  public static long memoryOfStr2TSDataType(Map<String, TSDataType> map) {
+    long usageInBytes = 0L;
+    for (Map.Entry<String, TSDataType> entry : map.entrySet()) {
+      usageInBytes = usageInBytes + MemUtils.getStringMem(entry.getKey()) + 4L;
+    }
+    return usageInBytes + 16L; // add the overhead of map
+  }
+
+  /** Estimates memory usage of a {@link Map}<{@link String}, {@link 
List}<{@link String}>>. */
+  public static long memoryOfStr2StrList(Map<String, List<String>> map) {
+    long usageInBytes = 0L;
+    for (Map.Entry<String, List<String>> entry : map.entrySet()) {
+      usageInBytes += MemUtils.getStringMem(entry.getKey());
+      for (String str : entry.getValue()) {
+        usageInBytes += MemUtils.getStringMem(str);
+      }
+    }
+    return usageInBytes + 16L; // add the overhead of map
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
new file mode 100644
index 00000000000..bbca18a4ca1
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.tsfile;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeighUtil;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PipeTsFileResource implements AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileResource.class);
+
+  private final File hardlinkOrCopiedFile;
+  private final boolean isTsFile;
+
+  public static final long TSFILE_MIN_TIME_TO_LIVE_IN_MS = 1000L * 20;
+  private final AtomicInteger referenceCount;
+  private final AtomicLong lastUnpinToZeroTime;
+
+  private static final float MEMORY_SUFFICIENT_THRESHOLD = 0.5f;
+  private PipeMemoryBlock allocatedMemoryBlock;
+  private Map<String, List<String>> deviceMeasurementsMap = null;
+  private Map<String, Boolean> deviceIsAlignedMap = null;
+  private Map<String, TSDataType> measurementDataTypeMap = null;
+
+  public PipeTsFileResource(File hardlinkOrCopiedFile, boolean isTsFile) {
+    this.hardlinkOrCopiedFile = hardlinkOrCopiedFile;
+    this.isTsFile = isTsFile;
+
+    referenceCount = new AtomicInteger(1);
+    lastUnpinToZeroTime = new AtomicLong(Long.MAX_VALUE);
+  }
+
+  public File getFile() {
+    return hardlinkOrCopiedFile;
+  }
+
+  ///////////////////// Reference Count /////////////////////
+
+  public int getReferenceCount() {
+    return referenceCount.get();
+  }
+
+  public int increaseAndGetReference() {
+    return referenceCount.addAndGet(1);
+  }
+
+  public int decreaseAndGetReference() {
+    final int finalReferenceCount = referenceCount.addAndGet(-1);
+    if (finalReferenceCount == 0) {
+      lastUnpinToZeroTime.set(System.currentTimeMillis());
+    }
+    if (finalReferenceCount < 0) {
+      LOGGER.warn("PipeTsFileResource's reference count is decreased to below 
0.");
+    }
+    return finalReferenceCount;
+  }
+
+  public synchronized boolean closeIfOutOfTimeToLive() throws IOException {
+    if (referenceCount.get() <= 0
+        && (deviceMeasurementsMap == null // Not cached yet.
+            || System.currentTimeMillis() - lastUnpinToZeroTime.get()
+                > TSFILE_MIN_TIME_TO_LIVE_IN_MS)) {
+      close();
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (deviceMeasurementsMap != null) {
+      deviceMeasurementsMap = null;
+    }
+
+    if (deviceIsAlignedMap != null) {
+      deviceIsAlignedMap = null;
+    }
+
+    if (measurementDataTypeMap != null) {
+      measurementDataTypeMap = null;
+    }
+
+    if (allocatedMemoryBlock != null) {
+      allocatedMemoryBlock.close();
+      allocatedMemoryBlock = null;
+    }
+
+    Files.deleteIfExists(hardlinkOrCopiedFile.toPath());
+
+    LOGGER.info("PipeTsFileResource: Closed tsfile {} and cleaned up.", 
hardlinkOrCopiedFile);
+  }
+
+  //////////////////////////// Cache Getter ////////////////////////////
+
+  public synchronized Map<String, List<String>> tryGetDeviceMeasurementsMap() 
throws IOException {
+    if (deviceMeasurementsMap == null && isTsFile) {
+      cacheObjectsIfAbsent();
+    }
+    return deviceMeasurementsMap;
+  }
+
+  public synchronized Map<String, Boolean> tryGetDeviceIsAlignedMap() throws 
IOException {
+    if (deviceIsAlignedMap == null && isTsFile) {
+      cacheObjectsIfAbsent();
+    }
+    return deviceIsAlignedMap;
+  }
+
+  public synchronized Map<String, TSDataType> tryGetMeasurementDataTypeMap() 
throws IOException {
+    if (measurementDataTypeMap == null && isTsFile) {
+      cacheObjectsIfAbsent();
+    }
+    return measurementDataTypeMap;
+  }
+
+  synchronized boolean cacheObjectsIfAbsent() throws IOException {
+    if (!isTsFile) {
+      return false;
+    }
+
+    if (allocatedMemoryBlock != null) {
+      // This means objects are already cached.
+      return true;
+    }
+
+    // See if pipe memory is sufficient to be allocated for 
TsFileSequenceReader.
+    // Only allocate when pipe memory used is less than 50%, because memory 
here
+    // is hard to shrink and may consume too much memory.
+    allocatedMemoryBlock =
+        PipeResourceManager.memory()
+            .forceAllocateIfSufficient(
+                
PipeConfig.getInstance().getPipeMemoryAllocateForTsFileSequenceReaderInBytes(),
+                MEMORY_SUFFICIENT_THRESHOLD);
+    if (allocatedMemoryBlock == null) {
+      LOGGER.info(
+          "PipeTsFileResource: Failed to create TsFileSequenceReader for 
tsfile {} in cache, because memory usage is high",
+          hardlinkOrCopiedFile.getPath());
+      return false;
+    }
+
+    long memoryRequiredInBytes = 0L;
+    try (TsFileSequenceReader sequenceReader =
+        new TsFileSequenceReader(hardlinkOrCopiedFile.getPath(), true, true)) {
+      deviceMeasurementsMap = sequenceReader.getDeviceMeasurementsMap();
+      memoryRequiredInBytes += 
PipeMemoryWeighUtil.memoryOfStr2StrList(deviceMeasurementsMap);
+
+      deviceIsAlignedMap = new HashMap<>();
+      final TsFileDeviceIterator deviceIsAlignedIterator =
+          sequenceReader.getAllDevicesIteratorWithIsAligned();
+      while (deviceIsAlignedIterator.hasNext()) {
+        final Pair<String, Boolean> deviceIsAlignedPair = 
deviceIsAlignedIterator.next();
+        deviceIsAlignedMap.put(deviceIsAlignedPair.getLeft(), 
deviceIsAlignedPair.getRight());
+      }
+      memoryRequiredInBytes += 
PipeMemoryWeighUtil.memoryOfStr2Bool(deviceIsAlignedMap);
+
+      measurementDataTypeMap = sequenceReader.getFullPathDataTypeMap();
+      memoryRequiredInBytes += 
PipeMemoryWeighUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
+    }
+    // Release memory of TsFileSequenceReader.
+    allocatedMemoryBlock.close();
+    allocatedMemoryBlock = null;
+
+    // Allocate again for the cached objects.
+    allocatedMemoryBlock =
+        PipeResourceManager.memory()
+            .forceAllocateIfSufficient(memoryRequiredInBytes, 
MEMORY_SUFFICIENT_THRESHOLD);
+    if (allocatedMemoryBlock == null) {
+      LOGGER.info(
+          "PipeTsFileResource: Failed to cache objects for tsfile {} in cache, 
because memory usage is high",
+          hardlinkOrCopiedFile.getPath());
+      deviceIsAlignedMap = null;
+      deviceMeasurementsMap = null;
+      measurementDataTypeMap = null;
+      return false;
+    }
+
+    LOGGER.info(
+        "PipeTsFileResource: Cached objects for tsfile {}.", 
hardlinkOrCopiedFile.getPath());
+    return true;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index af92a0fa553..315c2a7be60 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -21,7 +21,12 @@ package org.apache.iotdb.db.pipe.resource.tsfile;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -29,14 +34,50 @@ import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class PipeTsFileResourceManager {
 
-  private final Map<String, Integer> hardlinkOrCopiedFileToReferenceMap = new 
HashMap<>();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileResourceManager.class);
+
+  private final Map<String, PipeTsFileResource> 
hardlinkOrCopiedFileToPipeTsFileResourceMap =
+      new HashMap<>();
+  private final ReentrantLock lock = new ReentrantLock();
+
+  public PipeTsFileResourceManager() {
+    PipeAgent.runtime()
+        .registerPeriodicalJob(
+            "PipeTsFileResourceManager#ttlCheck()",
+            this::ttlCheck,
+            Math.max(PipeTsFileResource.TSFILE_MIN_TIME_TO_LIVE_IN_MS / 1000, 
1));
+  }
+
+  private void ttlCheck() {
+    final Iterator<Map.Entry<String, PipeTsFileResource>> iterator =
+        hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet().iterator();
+    while (iterator.hasNext()) {
+      final Map.Entry<String, PipeTsFileResource> entry = iterator.next();
 
-  /** Cache the File objects here to avoid redundancy */
-  private final Map<String, File> fileNameToFileMap = new HashMap<>();
+      lock.lock();
+      try {
+        if (entry.getValue().closeIfOutOfTimeToLive()) {
+          iterator.remove();
+        } else {
+          LOGGER.info(
+              "Pipe file (file name: {}) is still referenced {} times",
+              entry.getKey(),
+              entry.getValue().getReferenceCount());
+        }
+      } catch (IOException e) {
+        LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ", 
e);
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
 
   /**
    * given a file, create a hardlink or copy it to pipe dir, maintain a 
reference count for the
@@ -57,34 +98,48 @@ public class PipeTsFileResourceManager {
    * @return the hardlink or copied file
    * @throws IOException when create hardlink or copy file failed
    */
-  public synchronized File increaseFileReference(File file, boolean isTsFile) 
throws IOException {
-    // if the file is already a hardlink or copied file, just increase 
reference count and return it
-    if (increaseReferenceIfExists(file.getPath())) {
-      return file;
-    }
+  public File increaseFileReference(File file, boolean isTsFile) throws 
IOException {
+    lock.lock();
+    try {
+      // if the file is already a hardlink or copied file,
+      // just increase reference count and return it
+      if (increaseReferenceIfExists(file.getPath())) {
+        return file;
+      }
 
-    // if the file is not a hardlink or copied file, check if there is a 
related hardlink or copied
-    // file in pipe dir. if so, increase reference count and return it
-    final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file);
-    if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) {
-      return fileNameToFileMap.get(hardlinkOrCopiedFile.getPath());
-    }
+      // if the file is not a hardlink or copied file, check if there is a 
related hardlink or
+      // copied file in pipe dir. if so, increase reference count and return it
+      final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file);
+      if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) {
+        return hardlinkOrCopiedFileToPipeTsFileResourceMap
+            .get(hardlinkOrCopiedFile.getPath())
+            .getFile();
+      }
 
-    // if the file is not a hardlink or copied file, and there is no related 
hardlink or copied
-    // file in pipe dir, create a hardlink or copy it to pipe dir, maintain a 
reference count for
-    // the hardlink or copied file, and return the hardlink or copied file.
-    hardlinkOrCopiedFileToReferenceMap.put(hardlinkOrCopiedFile.getPath(), 1);
-    fileNameToFileMap.put(hardlinkOrCopiedFile.getPath(), 
hardlinkOrCopiedFile);
-    // if the file is a tsfile, create a hardlink in pipe dir and return it.
-    // otherwise, copy the file (.mod or .resource) to pipe dir and return it.
-    return isTsFile
-        ? createHardLink(file, hardlinkOrCopiedFile)
-        : copyFile(file, hardlinkOrCopiedFile);
+      // if the file is a tsfile, create a hardlink in pipe dir and will 
return it.
+      // otherwise, copy the file (.mod or .resource) to pipe dir and will 
return it.
+      final File resultFile =
+          isTsFile
+              ? createHardLink(file, hardlinkOrCopiedFile)
+              : copyFile(file, hardlinkOrCopiedFile);
+      // if the file is not a hardlink or copied file, and there is no related 
hardlink or copied
+      // file in pipe dir, create a hardlink or copy it to pipe dir, maintain 
a reference count for
+      // the hardlink or copied file, and return the hardlink or copied file.
+      hardlinkOrCopiedFileToPipeTsFileResourceMap.put(
+          resultFile.getPath(), new PipeTsFileResource(resultFile, isTsFile));
+      return resultFile;
+    } finally {
+      lock.unlock();
+    }
   }
 
   private boolean increaseReferenceIfExists(String path) {
-    hardlinkOrCopiedFileToReferenceMap.computeIfPresent(path, (key, value) -> 
value + 1);
-    return hardlinkOrCopiedFileToReferenceMap.containsKey(path);
+    final PipeTsFileResource resource = 
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(path);
+    if (resource != null) {
+      resource.increaseAndGetReference();
+      return true;
+    }
+    return false;
   }
 
   private static File getHardlinkOrCopiedFileInPipeDir(File file) throws 
IOException {
@@ -158,15 +213,16 @@ public class PipeTsFileResourceManager {
    * @param hardlinkOrCopiedFile the copied or hardlinked file
    * @throws IOException when delete file failed
    */
-  public synchronized void decreaseFileReference(File hardlinkOrCopiedFile) 
throws IOException {
-    final Integer updatedReference =
-        hardlinkOrCopiedFileToReferenceMap.computeIfPresent(
-            hardlinkOrCopiedFile.getPath(), (file, reference) -> reference - 
1);
-
-    if (updatedReference != null && updatedReference == 0) {
-      Files.deleteIfExists(hardlinkOrCopiedFile.toPath());
-      
hardlinkOrCopiedFileToReferenceMap.remove(hardlinkOrCopiedFile.getPath());
-      fileNameToFileMap.remove(hardlinkOrCopiedFile.getPath());
+  public void decreaseFileReference(File hardlinkOrCopiedFile) {
+    lock.lock();
+    try {
+      final String filePath = hardlinkOrCopiedFile.getPath();
+      final PipeTsFileResource resource = 
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath);
+      if (resource != null) {
+        resource.decreaseAndGetReference();
+      }
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -176,19 +232,89 @@ public class PipeTsFileResourceManager {
    * @param hardlinkOrCopiedFile the copied or hardlinked file
    * @return the reference count of the file
    */
-  public synchronized int getFileReferenceCount(File hardlinkOrCopiedFile) {
-    return 
hardlinkOrCopiedFileToReferenceMap.getOrDefault(hardlinkOrCopiedFile.getPath(), 
0);
+  public int getFileReferenceCount(File hardlinkOrCopiedFile) {
+    lock.lock();
+    try {
+      final String filePath = hardlinkOrCopiedFile.getPath();
+      final PipeTsFileResource resource = 
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath);
+      return resource != null ? resource.getReferenceCount() : 0;
+    } finally {
+      lock.unlock();
+    }
   }
 
-  public synchronized void pinTsFileResource(TsFileResource resource) throws 
IOException {
-    increaseFileReference(resource.getTsFile(), true);
+  /**
+   * Cache maps of the TsFile for further use.
+   *
+   * @return {@code true} if the maps are successfully put into cache or 
already cached. {@code
+   *     false} if they can not be cached.
+   */
+  public boolean cacheObjectsIfAbsent(File hardlinkOrCopiedTsFile) throws 
IOException {
+    lock.lock();
+    try {
+      final PipeTsFileResource resource =
+          
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
+      return resource != null && resource.cacheObjectsIfAbsent();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public Map<String, List<String>> getDeviceMeasurementsMapFromCache(File 
hardlinkOrCopiedTsFile)
+      throws IOException {
+    lock.lock();
+    try {
+      final PipeTsFileResource resource =
+          
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
+      return resource == null ? null : resource.tryGetDeviceMeasurementsMap();
+    } finally {
+      lock.unlock();
+    }
   }
 
-  public synchronized void unpinTsFileResource(TsFileResource resource) throws 
IOException {
-    
decreaseFileReference(getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
+  public Map<String, Boolean> getDeviceIsAlignedMapFromCache(File 
hardlinkOrCopiedTsFile)
+      throws IOException {
+    lock.lock();
+    try {
+      final PipeTsFileResource resource =
+          
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
+      return resource == null ? null : resource.tryGetDeviceIsAlignedMap();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public Map<String, TSDataType> getMeasurementDataTypeMapFromCache(File 
hardlinkOrCopiedTsFile)
+      throws IOException {
+    lock.lock();
+    try {
+      final PipeTsFileResource resource =
+          
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
+      return resource == null ? null : resource.tryGetMeasurementDataTypeMap();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void pinTsFileResource(TsFileResource resource) throws IOException {
+    lock.lock();
+    try {
+      increaseFileReference(resource.getTsFile(), true);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void unpinTsFileResource(TsFileResource resource) throws IOException {
+    lock.lock();
+    try {
+      
decreaseFileReference(getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
+    } finally {
+      lock.unlock();
+    }
   }
 
   public int getLinkedTsfileCount() {
-    return hardlinkOrCopiedFileToReferenceMap.size();
+    return hardlinkOrCopiedFileToPipeTsFileResourceMap.size();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index 951ad738460..9d1e530a19d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -40,7 +40,7 @@ public abstract class PipeWALResource implements Closeable {
 
   private final AtomicInteger referenceCount;
 
-  public static final long MIN_TIME_TO_LIVE_IN_MS = 1000L * 20;
+  public static final long WAL_MIN_TIME_TO_LIVE_IN_MS = 1000L * 20;
   private final AtomicLong lastLogicalPinTime;
   private final AtomicBoolean isPhysicallyPinned;
 
@@ -118,7 +118,7 @@ public abstract class PipeWALResource implements Closeable {
    */
   private boolean unpinPhysicallyIfOutOfTimeToLive() {
     if (isPhysicallyPinned.get()) {
-      if (System.currentTimeMillis() - lastLogicalPinTime.get() > 
MIN_TIME_TO_LIVE_IN_MS) {
+      if (System.currentTimeMillis() - lastLogicalPinTime.get() > 
WAL_MIN_TIME_TO_LIVE_IN_MS) {
         try {
           unpinInternal();
         } catch (MemTablePinException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index 1b3dff90fd8..ad1baf1b8a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -19,9 +19,7 @@
 
 package org.apache.iotdb.db.pipe.resource.wal;
 
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
 
 import org.slf4j.Logger;
@@ -32,8 +30,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
 public abstract class PipeWALResourceManager {
@@ -45,10 +41,6 @@ public abstract class PipeWALResourceManager {
   private static final int SEGMENT_LOCK_COUNT = 32;
   private final ReentrantLock[] memtableIdSegmentLocks;
 
-  private static final ScheduledExecutorService PIPE_WAL_RESOURCE_TTL_CHECKER =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-          ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER.getName());
-
   protected PipeWALResourceManager() {
     // memtableIdToPipeWALResourceMap can be concurrently accessed by multiple 
threads
     memtableIdToPipeWALResourceMap = new ConcurrentHashMap<>();
@@ -58,38 +50,35 @@ public abstract class PipeWALResourceManager {
       memtableIdSegmentLocks[i] = new ReentrantLock();
     }
 
-    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-        PIPE_WAL_RESOURCE_TTL_CHECKER,
-        () -> {
-          final Iterator<Map.Entry<Long, PipeWALResource>> iterator =
-              memtableIdToPipeWALResourceMap.entrySet().iterator();
-          while (iterator.hasNext()) {
-            final Map.Entry<Long, PipeWALResource> entry = iterator.next();
-            final ReentrantLock lock =
-                memtableIdSegmentLocks[(int) (entry.getKey() % 
SEGMENT_LOCK_COUNT)];
-
-            lock.lock();
-            try {
-              if (entry.getValue().invalidateIfPossible()) {
-                iterator.remove();
-              } else {
-                LOGGER.info(
-                    "WAL (memtableId {}) is still referenced {} times",
-                    entry.getKey(),
-                    entry.getValue().getReferenceCount());
-              }
-            } finally {
-              lock.unlock();
-            }
-          }
-        },
-        PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
-        PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
-        TimeUnit.MILLISECONDS);
+    PipeAgent.runtime()
+        .registerPeriodicalJob(
+            "PipeWALResourceManager#ttlCheck()",
+            this::ttlCheck,
+            Math.max(PipeWALResource.WAL_MIN_TIME_TO_LIVE_IN_MS / 1000, 1));
   }
 
-  public int getApproximatePinnedWALCount() {
-    return memtableIdToPipeWALResourceMap.size();
+  private void ttlCheck() {
+    final Iterator<Map.Entry<Long, PipeWALResource>> iterator =
+        memtableIdToPipeWALResourceMap.entrySet().iterator();
+    while (iterator.hasNext()) {
+      final Map.Entry<Long, PipeWALResource> entry = iterator.next();
+      final ReentrantLock lock =
+          memtableIdSegmentLocks[(int) (entry.getKey() % SEGMENT_LOCK_COUNT)];
+
+      lock.lock();
+      try {
+        if (entry.getValue().invalidateIfPossible()) {
+          iterator.remove();
+        } else {
+          LOGGER.info(
+              "WAL (memtableId {}) is still referenced {} times",
+              entry.getKey(),
+              entry.getValue().getReferenceCount());
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
   }
 
   public final void pin(final WALEntryHandler walEntryHandler) throws 
IOException {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
index 04308d09cab..03d408b7743 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResource;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
@@ -45,7 +47,9 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.concurrent.TimeUnit;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.fail;
 
 public class PipeTsFileResourceManagerTest {
@@ -61,6 +65,7 @@ public class PipeTsFileResourceManagerTest {
   @Before
   public void setUp() throws Exception {
     pipeTsFileResourceManager = new PipeTsFileResourceManager();
+    PipeAgent.runtime().startPeriodicalJobExecutor();
 
     createTsfile(TS_FILE_NAME);
     creatModsFile(MODS_FILE_NAME);
@@ -145,6 +150,9 @@ public class PipeTsFileResourceManagerTest {
     if (pipeFolder.exists()) {
       FileUtils.deleteDirectory(pipeFolder);
     }
+
+    PipeAgent.runtime().stopPeriodicalJobExecutor();
+    PipeAgent.runtime().clearPeriodicalJobExecutor();
   }
 
   @Test
@@ -218,7 +226,13 @@ public class PipeTsFileResourceManagerTest {
     Assert.assertEquals(0, 
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
     Assert.assertFalse(Files.exists(originFile.toPath()));
     Assert.assertFalse(Files.exists(originModFile.toPath()));
-    Assert.assertFalse(Files.exists(pipeTsfile.toPath()));
-    Assert.assertFalse(Files.exists(pipeModFile.toPath()));
+    // Pipe TsFile will be cleaned by a timed thread, so we wait some time 
here.
+    await()
+        .atMost(3 * PipeTsFileResource.TSFILE_MIN_TIME_TO_LIVE_IN_MS, 
TimeUnit.MILLISECONDS)
+        .untilAsserted(
+            () -> {
+              Assert.assertFalse(Files.exists(pipeTsfile.toPath()));
+              Assert.assertFalse(Files.exists(pipeModFile.toPath()));
+            });
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index f8cd6f46277..bd6c8097f1e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -133,7 +133,6 @@ public enum ThreadName {
   PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"),
   PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR("Pipe-Runtime-Periodical-Job-Executor"),
   PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
-  PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"),
   PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
   WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
   STATEFUL_TRIGGER_INFORMATION_UPDATER("Stateful-Trigger-Information-Updater"),
@@ -269,7 +268,6 @@ public enum ThreadName {
               PIPE_RUNTIME_PROCEDURE_SUBMITTER,
               PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR,
               PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
-              PIPE_WAL_RESOURCE_TTL_CHECKER,
               PIPE_RECEIVER_AIR_GAP_AGENT,
               WINDOW_EVALUATION_SERVICE,
               STATEFUL_TRIGGER_INFORMATION_UPDATER));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 5ca93c3bd07..579d59e3720 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -156,7 +156,7 @@ public class CommonConfig {
   private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 
10_000;
   private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 
1000L;
   private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000;
-  private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 30;
+  private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
 
   private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
   private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50; 
// 50B
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 0e7fd60f91f..64c747ecae3 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -350,6 +350,10 @@ public class TsFileSequenceReader implements AutoCloseable 
{
     }
   }
 
+  public void clearCachedDeviceMetadata() {
+    cachedDeviceMetadata.clear();
+  }
+
   private Map<String, TimeseriesMetadata> readDeviceMetadataFromDisk(String 
device)
       throws IOException {
     readFileMetadata();

Reply via email to