This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new af59fc882a7 [IOTDB-6043] Pipe: a framework to support wal hardlink 
mode (#10440) (#10694)
af59fc882a7 is described below

commit af59fc882a7f1dfe52c529250d1b47081fe57f45
Author: Itami Sho <[email protected]>
AuthorDate: Fri Jul 28 11:15:29 2023 +0800

    [IOTDB-6043] Pipe: a framework to support wal hardlink mode (#10440) 
(#10694)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
    (cherry picked from commit d4c643c377832f02391eff49a674cd6cc440c073)
---
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |   2 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |   4 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |   4 +-
 .../PipeHardlinkFileDirStartupCleaner.java         |  10 +-
 .../db/pipe/resource/PipeResourceManager.java      |  31 +++-
 .../PipeTsFileResourceManager.java}                |  25 +--
 .../db/pipe/resource/wal/PipeWALResource.java      |  26 ++--
 .../pipe/resource/wal/PipeWALResourceManager.java  |  46 ++++--
 .../wal/hardlink/PipeWALHardlinkResource.java      |  47 ++++++
 .../hardlink/PipeWALHardlinkResourceManager.java   | 167 +++++++++++++++++++++
 .../wal/selfhost/PipeWALSelfHostResource.java      |  41 +++++
 .../selfhost/PipeWALSelfHostResourceManager.java}  |  32 ++--
 .../dataregion/wal/checkpoint/MemTableInfo.java    |   4 +
 .../storageengine/dataregion/wal/node/WALNode.java |   5 +-
 .../dataregion/wal/utils/WALEntryHandler.java      |  37 ++++-
 ...est.java => PipeTsFileResourceManagerTest.java} |  61 ++++----
 .../PipeWALHardlinkResourceManagerTest.java        | 114 ++++++++++++++
 .../resources/conf/iotdb-common.properties         |  16 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  36 ++++-
 .../iotdb/commons/conf/CommonDescriptor.java       |  10 ++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  22 +++
 21 files changed, 620 insertions(+), 120 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index a8c966ad408..391bdc97ba2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import 
org.apache.iotdb.db.pipe.resource.file.PipeHardlinkFileDirStartupCleaner;
+import org.apache.iotdb.db.pipe.resource.PipeHardlinkFileDirStartupCleaner;
 import org.apache.iotdb.db.service.ResourcesInformationHolder;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index abb48185db3..7162cdabeca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -75,7 +75,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   @Override
   public boolean internallyIncreaseResourceReferenceCount(String 
holderMessage) {
     try {
-      PipeResourceManager.wal().pin(walEntryHandler.getMemTableId(), 
walEntryHandler);
+      PipeResourceManager.wal().pin(walEntryHandler);
       return true;
     } catch (Exception e) {
       LOGGER.warn(
@@ -90,7 +90,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   @Override
   public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
     try {
-      PipeResourceManager.wal().unpin(walEntryHandler.getMemTableId());
+      PipeResourceManager.wal().unpin(walEntryHandler);
       return true;
     } catch (Exception e) {
       LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 7f164ab44fb..bb260bf22ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -109,7 +109,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   @Override
   public boolean internallyIncreaseResourceReferenceCount(String 
holderMessage) {
     try {
-      tsFile = PipeResourceManager.file().increaseFileReference(tsFile, true);
+      tsFile = PipeResourceManager.tsfile().increaseFileReference(tsFile, 
true);
       return true;
     } catch (Exception e) {
       LOGGER.warn(
@@ -124,7 +124,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   @Override
   public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
     try {
-      PipeResourceManager.file().decreaseFileReference(tsFile);
+      PipeResourceManager.tsfile().decreaseFileReference(tsFile);
       return true;
     } catch (Exception e) {
       LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeHardlinkFileDirStartupCleaner.java
similarity index 87%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeHardlinkFileDirStartupCleaner.java
index 1acadceb5fd..992d599703f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeHardlinkFileDirStartupCleaner.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.resource.file;
+package org.apache.iotdb.db.pipe.resource;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -39,14 +39,14 @@ public class PipeHardlinkFileDirStartupCleaner {
    * PipeConfig.PIPE_TSFILE_DIR_NAME directory.
    */
   public static void clean() {
-    for (String dataDir : 
IoTDBDescriptor.getInstance().getConfig().getDataDirs()) {
-      for (File file :
+    for (final String dataDir : 
IoTDBDescriptor.getInstance().getConfig().getDataDirs()) {
+      for (final File file :
           FileUtils.listFilesAndDirs(
               new File(dataDir), DirectoryFileFilter.INSTANCE, 
DirectoryFileFilter.INSTANCE)) {
         if (file.isDirectory()
-            && 
file.getName().equals(PipeConfig.getInstance().getPipeHardlinkTsFileDirName())) 
{
+            && 
file.getName().equals(PipeConfig.getInstance().getPipeHardlinkBaseDirName())) {
           LOGGER.info(
-              "pipe hardlink tsfile dir found, deleting it: {}, result: {}",
+              "pipe hardlink dir found, deleting it: {}, result: {}",
               file,
               FileUtils.deleteQuietly(file));
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
index 43bddd872f6..519aa0b56ab 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
@@ -19,27 +19,42 @@
 
 package org.apache.iotdb.db.pipe.resource;
 
-import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
+import 
org.apache.iotdb.db.pipe.resource.wal.hardlink.PipeWALHardlinkResourceManager;
+import 
org.apache.iotdb.db.pipe.resource.wal.selfhost.PipeWALSelfHostResourceManager;
+
+import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeResourceManager {
 
-  private final PipeFileResourceManager pipeFileResourceManager;
-  private final PipeWALResourceManager pipeWALResourceManager;
+  private final PipeTsFileResourceManager pipeTsFileResourceManager;
+  private final AtomicReference<PipeWALResourceManager> pipeWALResourceManager;
 
-  public static PipeFileResourceManager file() {
-    return PipeResourceManagerHolder.INSTANCE.pipeFileResourceManager;
+  public static PipeTsFileResourceManager tsfile() {
+    return PipeResourceManagerHolder.INSTANCE.pipeTsFileResourceManager;
   }
 
   public static PipeWALResourceManager wal() {
-    return PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager;
+    if (PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager.get() == 
null) {
+      synchronized (PipeResourceManagerHolder.INSTANCE) {
+        if (PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager.get() == 
null) {
+          PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager.set(
+              PipeConfig.getInstance().getPipeHardLinkWALEnabled()
+                  ? new PipeWALHardlinkResourceManager()
+                  : new PipeWALSelfHostResourceManager());
+        }
+      }
+    }
+    return PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager.get();
   }
 
   ///////////////////////////// SINGLETON /////////////////////////////
 
   private PipeResourceManager() {
-    pipeFileResourceManager = new PipeFileResourceManager();
-    pipeWALResourceManager = new PipeWALResourceManager();
+    pipeTsFileResourceManager = new PipeTsFileResourceManager();
+    pipeWALResourceManager = new AtomicReference<>();
   }
 
   private static class PipeResourceManagerHolder {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
similarity index 90%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 4b04225cf58..08f1e7db2e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -17,11 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.resource.file;
+package org.apache.iotdb.db.pipe.resource.tsfile;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.utils.FileUtils;
 
 import java.io.File;
 import java.io.IOException;
@@ -31,7 +30,7 @@ import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 
-public class PipeFileResourceManager {
+public class PipeTsFileResourceManager {
 
   private final Map<String, Integer> hardlinkOrCopiedFileToReferenceMap = new 
HashMap<>();
 
@@ -98,11 +97,12 @@ public class PipeFileResourceManager {
 
   private static String getPipeTsFileDirPath(File file) throws IOException {
     while (!file.getName().equals(IoTDBConstant.SEQUENCE_FLODER_NAME)
-        && !file.getName().equals(IoTDBConstant.UNSEQUENCE_FLODER_NAME)
-        && 
!file.getName().equals(PipeConfig.getInstance().getPipeHardlinkTsFileDirName()))
 {
+        && !file.getName().equals(IoTDBConstant.UNSEQUENCE_FLODER_NAME)) {
       file = file.getParentFile();
     }
     return file.getParentFile().getCanonicalPath()
+        + File.separator
+        + PipeConfig.getInstance().getPipeHardlinkBaseDirName()
         + File.separator
         + PipeConfig.getInstance().getPipeHardlinkTsFileDirName();
   }
@@ -110,8 +110,7 @@ public class PipeFileResourceManager {
   private static String getRelativeFilePath(File file) {
     StringBuilder builder = new StringBuilder(file.getName());
     while (!file.getName().equals(IoTDBConstant.SEQUENCE_FLODER_NAME)
-        && !file.getName().equals(IoTDBConstant.UNSEQUENCE_FLODER_NAME)
-        && 
!file.getName().equals(PipeConfig.getInstance().getPipeHardlinkTsFileDirName()))
 {
+        && !file.getName().equals(IoTDBConstant.UNSEQUENCE_FLODER_NAME)) {
       file = file.getParentFile();
       builder =
           new StringBuilder(file.getName())
@@ -165,18 +164,6 @@ public class PipeFileResourceManager {
     }
   }
 
-  /**
-   * clear all hardlink or copied files under pipe dir of the given data dir.
-   *
-   * <p>this method can be only invoked when the system is booting up.
-   */
-  public synchronized void clear(String dataDir) {
-    File pipeTsFileDir = new File(dataDir, 
PipeConfig.getInstance().getPipeHardlinkTsFileDirName());
-    if (pipeTsFileDir.exists()) {
-      FileUtils.deleteDirectory(pipeTsFileDir);
-    }
-  }
-
   /**
    * get the reference count of the file.
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index 27b79ec42a5..085c71d5f9c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -32,11 +32,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class PipeWALResource implements Closeable {
+public abstract class PipeWALResource implements Closeable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeWALResource.class);
 
-  private final WALEntryHandler walEntryHandler;
+  protected final WALEntryHandler walEntryHandler;
 
   private final AtomicInteger referenceCount;
 
@@ -44,7 +44,7 @@ public class PipeWALResource implements Closeable {
   private final AtomicLong lastLogicalPinTime;
   private final AtomicBoolean isPhysicallyPinned;
 
-  public PipeWALResource(WALEntryHandler walEntryHandler) {
+  protected PipeWALResource(WALEntryHandler walEntryHandler) {
     this.walEntryHandler = walEntryHandler;
 
     referenceCount = new AtomicInteger(0);
@@ -53,11 +53,11 @@ public class PipeWALResource implements Closeable {
     isPhysicallyPinned = new AtomicBoolean(false);
   }
 
-  public void pin() throws PipeRuntimeNonCriticalException {
+  public final void pin() throws PipeRuntimeNonCriticalException {
     if (referenceCount.get() == 0) {
       if (!isPhysicallyPinned.get()) {
         try {
-          walEntryHandler.pinMemTable();
+          pinInternal();
         } catch (MemTablePinException e) {
           throw new PipeRuntimeNonCriticalException(
               String.format(
@@ -75,7 +75,10 @@ public class PipeWALResource implements Closeable {
     referenceCount.incrementAndGet();
   }
 
-  public void unpin() throws PipeRuntimeNonCriticalException {
+  protected abstract void pinInternal()
+      throws MemTablePinException, PipeRuntimeNonCriticalException;
+
+  public final void unpin() throws PipeRuntimeNonCriticalException {
     final int finalReferenceCount = referenceCount.get();
 
     if (finalReferenceCount == 1) {
@@ -90,12 +93,15 @@ public class PipeWALResource implements Closeable {
     referenceCount.decrementAndGet();
   }
 
+  protected abstract void unpinInternal()
+      throws MemTablePinException, PipeRuntimeNonCriticalException;
+
   /**
    * Invalidate the wal if it is unpinned and out of time to live.
    *
    * @return true if the wal is invalidated, false otherwise
    */
-  public boolean invalidateIfPossible() {
+  public final boolean invalidateIfPossible() {
     if (referenceCount.get() > 0) {
       return false;
     }
@@ -114,7 +120,7 @@ public class PipeWALResource implements Closeable {
     if (isPhysicallyPinned.get()) {
       if (System.currentTimeMillis() - lastLogicalPinTime.get() > 
MIN_TIME_TO_LIVE_IN_MS) {
         try {
-          walEntryHandler.unpinMemTable();
+          unpinInternal();
         } catch (MemTablePinException e) {
           throw new PipeRuntimeNonCriticalException(
               String.format(
@@ -138,10 +144,10 @@ public class PipeWALResource implements Closeable {
   }
 
   @Override
-  public void close() {
+  public final void close() {
     if (isPhysicallyPinned.get()) {
       try {
-        walEntryHandler.unpinMemTable();
+        unpinInternal();
       } catch (MemTablePinException e) {
         LOGGER.error(
             "failed to unpin wal {} when closing pipe wal resource, because 
{}",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index e23f2e897d5..64cd83f1a67 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -1,10 +1,30 @@
-/*  * Licensed to the Apache Software Foundation (ASF) under one  * or more 
contributor license agreements.  See the NOTICE file  * distributed with this 
work for additional information  * regarding copyright ownership.  The ASF 
licenses this file  * to you under the Apache License, Version 2.0 (the  * 
"License"); you may not use this file except in compliance  * with the License. 
 You may obtain a copy of the License at  *  *     
http://www.apache.org/licenses/LICENSE-2.0  *  * Unless r [...]
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.wal;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -12,9 +32,9 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
-public class PipeWALResourceManager {
+public abstract class PipeWALResourceManager {
 
-  private final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
+  protected final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
 
   private static final int SEGMENT_LOCK_COUNT = 32;
   private final ReentrantLock[] memtableIdSegmentLocks;
@@ -23,7 +43,7 @@ public class PipeWALResourceManager {
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
           ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER.getName());
 
-  public PipeWALResourceManager() {
+  protected PipeWALResourceManager() {
     // memtableIdToPipeWALResourceMap can be concurrently accessed by multiple 
threads
     memtableIdToPipeWALResourceMap = new ConcurrentHashMap<>();
 
@@ -57,27 +77,33 @@ public class PipeWALResourceManager {
         TimeUnit.MILLISECONDS);
   }
 
-  public void pin(long memtableId, WALEntryHandler walEntryHandler) {
+  public final void pin(final WALEntryHandler walEntryHandler) throws 
IOException {
+    final long memtableId = walEntryHandler.getMemTableId();
     final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % 
SEGMENT_LOCK_COUNT)];
 
     lock.lock();
     try {
-      memtableIdToPipeWALResourceMap
-          .computeIfAbsent(memtableId, id -> new 
PipeWALResource(walEntryHandler))
-          .pin();
+      pinInternal(memtableId, walEntryHandler);
     } finally {
       lock.unlock();
     }
   }
 
-  public void unpin(long memtableId) {
+  protected abstract void pinInternal(long memtableId, WALEntryHandler 
walEntryHandler)
+      throws IOException;
+
+  public final void unpin(final WALEntryHandler walEntryHandler) throws 
IOException {
+    final long memtableId = walEntryHandler.getMemTableId();
     final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % 
SEGMENT_LOCK_COUNT)];
 
     lock.lock();
     try {
-      memtableIdToPipeWALResourceMap.get(memtableId).unpin();
+      unpinInternal(memtableId, walEntryHandler);
     } finally {
       lock.unlock();
     }
   }
+
+  protected abstract void unpinInternal(long memtableId, WALEntryHandler 
walEntryHandler)
+      throws IOException;
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResource.java
new file mode 100644
index 00000000000..f1ad513ccc4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResource.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.wal.hardlink;
+
+import org.apache.iotdb.db.pipe.resource.wal.PipeWALResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
+
+public class PipeWALHardlinkResource extends PipeWALResource {
+
+  private final PipeWALHardlinkResourceManager resourceManager;
+
+  protected PipeWALHardlinkResource(
+      WALEntryHandler walEntryHandler, PipeWALHardlinkResourceManager 
resourceManager) {
+    super(walEntryHandler);
+    this.resourceManager = resourceManager;
+  }
+
+  @Override
+  protected void pinInternal() throws MemTablePinException {
+    // TODO: hardlink
+    walEntryHandler.pinMemTable();
+  }
+
+  @Override
+  protected void unpinInternal() throws MemTablePinException {
+    // TODO: hardlink
+    walEntryHandler.unpinMemTable();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResourceManager.java
new file mode 100644
index 00000000000..acd64cf23b4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResourceManager.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.wal.hardlink;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PipeWALHardlinkResourceManager extends PipeWALResourceManager {
+
+  @Override
+  protected void pinInternal(long memtableId, WALEntryHandler walEntryHandler) 
{
+    memtableIdToPipeWALResourceMap
+        .computeIfAbsent(memtableId, id -> new 
PipeWALHardlinkResource(walEntryHandler, this))
+        .pin();
+  }
+
+  @Override
+  protected void unpinInternal(long memtableId, WALEntryHandler 
walEntryHandler) {
+    memtableIdToPipeWALResourceMap.get(memtableId).unpin();
+  }
+
+  //////////////////////////// hardlink related ////////////////////////////
+
+  private final Map<String, Integer> hardlinkToReferenceMap = new HashMap<>();
+
+  /**
+   * given a file, create a hardlink, maintain a reference count for the 
hardlink, and return the
+   * hardlink.
+   *
+   * <p>if the given file is already a hardlink, increase its reference count 
and return it.
+   *
+   * <p>if the given file is a wal, create a hardlink in pipe dir, increase 
the reference count of
+   * the hardlink and return it.
+   *
+   * @param file wal file. can be original file or the hardlink of original 
file
+   * @return the hardlink
+   * @throws IOException when create hardlink failed
+   */
+  public synchronized File increaseFileReference(File file) throws IOException 
{
+    // if the file is already a hardlink, just increase reference count and 
return it
+    if (increaseReferenceIfExists(file.getPath())) {
+      return file;
+    }
+
+    // if the file is not a hardlink, check if there is a related hardlink in 
pipe dir. if so,
+    // increase reference count and return it.
+    final File hardlink = getHardlinkInPipeWALDir(file);
+    if (increaseReferenceIfExists(hardlink.getPath())) {
+      return hardlink;
+    }
+
+    // if the file is a wal, and there is no related hardlink in pipe dir, 
create a hardlink to pipe
+    // dir, maintain a reference count for the hardlink, and return the 
hardlink.
+    hardlinkToReferenceMap.put(hardlink.getPath(), 1);
+    return createHardlink(file, hardlink);
+  }
+
+  private boolean increaseReferenceIfExists(String path) {
+    hardlinkToReferenceMap.computeIfPresent(path, (k, v) -> v + 1);
+    return hardlinkToReferenceMap.containsKey(path);
+  }
+
+  // TODO: Check me! Make sure the file is not a hardlink.
+  // TODO: IF user specify a wal by config, will the method work?
+  private static File getHardlinkInPipeWALDir(File file) throws IOException {
+    try {
+      return new File(getPipeWALDirPath(file), getRelativeFilePath(file));
+    } catch (Exception e) {
+      throw new IOException(
+          String.format(
+              "failed to get hardlink in pipe dir " + "for file %s, it is not 
a wal",
+              file.getPath()),
+          e);
+    }
+  }
+
+  private static String getPipeWALDirPath(File file) throws IOException {
+    while (!file.getName().equals(IoTDBConstant.WAL_FOLDER_NAME)) {
+      file = file.getParentFile();
+    }
+
+    return file.getParentFile().getCanonicalPath()
+        + File.separator
+        + IoTDBConstant.DATA_FOLDER_NAME
+        + File.separator
+        + PipeConfig.getInstance().getPipeHardlinkBaseDirName()
+        + File.separator
+        + PipeConfig.getInstance().getPipeHardlinkWALDirName();
+  }
+
+  private static String getRelativeFilePath(File file) {
+    StringBuilder builder = new StringBuilder(file.getName());
+    while 
(!file.getParentFile().getName().equals(IoTDBConstant.WAL_FOLDER_NAME)) {
+      file = file.getParentFile();
+      builder =
+          new StringBuilder(file.getName())
+              .append(IoTDBConstant.FILE_NAME_SEPARATOR)
+              .append(builder);
+    }
+    return builder.toString();
+  }
+
+  private static File createHardlink(File sourceFile, File hardlink) throws 
IOException {
+    if (!hardlink.getParentFile().exists() && 
!hardlink.getParentFile().mkdirs()) {
+      throw new IOException(
+          String.format(
+              "failed to create hardlink %s for file %s: failed to create 
parent dir %s",
+              hardlink.getPath(), sourceFile.getPath(), 
hardlink.getParentFile().getPath()));
+    }
+
+    final Path sourcePath = 
FileSystems.getDefault().getPath(sourceFile.getAbsolutePath());
+    final Path linkPath = 
FileSystems.getDefault().getPath(hardlink.getAbsolutePath());
+    Files.createLink(linkPath, sourcePath);
+    return hardlink;
+  }
+
+  /**
+   * given a hardlink, decrease its reference count, if the reference count is 
0, delete the file.
+   * if the given file is not a hardlink, do nothing.
+   *
+   * @param hardlink the hardlinked file
+   * @throws IOException when delete file failed
+   */
+  public synchronized void decreaseFileReference(File hardlink) throws 
IOException {
+    final Integer updatedReference =
+        hardlinkToReferenceMap.computeIfPresent(
+            hardlink.getPath(), (file, reference) -> reference - 1);
+
+    if (updatedReference != null && updatedReference == 0) {
+      Files.deleteIfExists(hardlink.toPath());
+      hardlinkToReferenceMap.remove(hardlink.getPath());
+    }
+  }
+
+  @TestOnly
+  public synchronized int getFileReferenceCount(File hardlink) {
+    return hardlinkToReferenceMap.getOrDefault(hardlink.getPath(), 0);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResource.java
new file mode 100644
index 00000000000..e8e03e64a16
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResource.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.wal.selfhost;
+
+import org.apache.iotdb.db.pipe.resource.wal.PipeWALResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
+
+public class PipeWALSelfHostResource extends PipeWALResource {
+
+  public PipeWALSelfHostResource(WALEntryHandler walEntryHandler) {
+    super(walEntryHandler);
+  }
+
+  @Override
+  protected void pinInternal() throws MemTablePinException {
+    walEntryHandler.pinMemTable();
+  }
+
+  @Override
+  protected void unpinInternal() throws MemTablePinException {
+    walEntryHandler.unpinMemTable();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResourceManager.java
similarity index 50%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResourceManager.java
index 43bddd872f6..94404eafbed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResourceManager.java
@@ -17,32 +17,22 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.resource;
+package org.apache.iotdb.db.pipe.resource.wal.selfhost;
 
-import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager;
 import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
 
-public class PipeResourceManager {
+public class PipeWALSelfHostResourceManager extends PipeWALResourceManager {
 
-  private final PipeFileResourceManager pipeFileResourceManager;
-  private final PipeWALResourceManager pipeWALResourceManager;
-
-  public static PipeFileResourceManager file() {
-    return PipeResourceManagerHolder.INSTANCE.pipeFileResourceManager;
-  }
-
-  public static PipeWALResourceManager wal() {
-    return PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager;
-  }
-
-  ///////////////////////////// SINGLETON /////////////////////////////
-
-  private PipeResourceManager() {
-    pipeFileResourceManager = new PipeFileResourceManager();
-    pipeWALResourceManager = new PipeWALResourceManager();
+  @Override
+  protected void pinInternal(long memtableId, WALEntryHandler walEntryHandler) 
{
+    memtableIdToPipeWALResourceMap
+        .computeIfAbsent(memtableId, id -> new 
PipeWALSelfHostResource(walEntryHandler))
+        .pin();
   }
 
-  private static class PipeResourceManagerHolder {
-    private static final PipeResourceManager INSTANCE = new 
PipeResourceManager();
+  @Override
+  protected void unpinInternal(long memtableId, WALEntryHandler 
walEntryHandler) {
+    memtableIdToPipeWALResourceMap.get(memtableId).unpin();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
index 322634980b5..75ab7d9f6b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
@@ -116,6 +116,10 @@ public class MemTableInfo implements SerializedSize {
     return pinCount > 0;
   }
 
+  public int getPinCount() {
+    return pinCount;
+  }
+
   public boolean isFlushed() {
     return flushed;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index e8c34f82967..8ba4b3149a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -372,11 +372,12 @@ public class WALNode implements IWALNode {
       }
       if (oldestMemTableInfo.isPinned()) {
         logger.warn(
-            "Pipe: Effective information ratio {} of wal node-{} is below wal 
min effective info ratio {}. But fail to delete memTable-{}'s wal files because 
they are pinned by the Pipe module.",
+            "Pipe: Effective information ratio {} of wal node-{} is below wal 
min effective info ratio {}. But fail to delete memTable-{}'s wal files because 
they are pinned by the Pipe module. Pin count: {}.",
             effectiveInfoRatio,
             identifier,
             config.getWalMinEffectiveInfoRatio(),
-            oldestMemTableInfo.getMemTableId());
+            oldestMemTableInfo.getMemTableId(),
+            oldestMemTableInfo.getPinCount());
         return false;
       }
       IMemTable oldestMemTable = oldestMemTableInfo.getMemTable();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
index 27b38844f33..abcf3643ce2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
@@ -28,11 +28,15 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.util.concurrent.atomic.AtomicReference;
+
 /**
  * This handler is used by the Pipe to find the corresponding insert node. 
Besides, it can try to
  * pin/unpin the wal entries by the memTable id.
  */
 public class WALEntryHandler {
+
   private static final Logger logger = 
LoggerFactory.getLogger(WALEntryHandler.class);
 
   private long memTableId = -1;
@@ -46,6 +50,9 @@ public class WALEntryHandler {
   // wal node, null when wal is disabled
   private WALNode walNode = null;
 
+  private volatile boolean isHardlink = false;
+  private final AtomicReference<File> hardlinkFile = new AtomicReference<>();
+
   public WALEntryHandler(WALEntryValue value) {
     this.value = value;
   }
@@ -90,6 +97,7 @@ public class WALEntryHandler {
         throw new WALPipeException("Fail to get value because the entry type 
isn't InsertNode.");
       }
     }
+
     // wait until the position is ready
     while (!walEntryPosition.canRead()) {
       try {
@@ -101,14 +109,8 @@ public class WALEntryHandler {
         Thread.currentThread().interrupt();
       }
     }
-    // read from the wal file
-    InsertNode node = null;
-    try {
-      node = walEntryPosition.readInsertNodeViaCache();
-    } catch (Exception e) {
-      throw new WALPipeException("Fail to get value because the file content 
isn't correct.", e);
-    }
 
+    final InsertNode node = isHardlink ? readFromHardlinkFile() : 
readFromOriginalWALFile();
     if (node == null) {
       throw new WALPipeException(
           String.format("Fail to get the wal value of the position %s.", 
walEntryPosition));
@@ -116,6 +118,22 @@ public class WALEntryHandler {
     return node;
   }
 
+  private InsertNode readFromOriginalWALFile() throws WALPipeException {
+    try {
+      return walEntryPosition.readInsertNodeViaCache();
+    } catch (Exception e) {
+      throw new WALPipeException("Fail to get value because the file content 
isn't correct.", e);
+    }
+  }
+
+  private InsertNode readFromHardlinkFile() throws WALPipeException {
+    try {
+      return walEntryPosition.readInsertNodeViaCache();
+    } catch (Exception e) {
+      throw new WALPipeException("Fail to get value because the file content 
isn't correct.", e);
+    }
+  }
+
   public long getMemTableId() {
     return memTableId;
   }
@@ -149,6 +167,11 @@ public class WALEntryHandler {
     this.walEntryPosition.setSize(size);
   }
 
+  public void hardlinkTo(File hardlinkFile) {
+    isHardlink = true;
+    this.hardlinkFile.set(hardlinkFile);
+  }
+
   @Override
   public String toString() {
     return "WALEntryHandler{"
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
similarity index 74%
rename from 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
rename to 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
index 390ba19f53f..ef9496901f3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.FileUtils;
-import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -48,7 +48,7 @@ import java.nio.file.Files;
 
 import static org.junit.Assert.fail;
 
-public class PipeFileResourceManagerTest {
+public class PipeTsFileResourceManagerTest {
 
   private static final String ROOT_DIR = "target" + File.separator + 
"PipeTsFileHolderTest";
   private static final String SEQUENCE_DIR =
@@ -56,11 +56,11 @@ public class PipeFileResourceManagerTest {
   private static final String TS_FILE_NAME = SEQUENCE_DIR + File.separator + 
"test.tsfile";
   private static final String MODS_FILE_NAME = TS_FILE_NAME + ".mods";
 
-  private PipeFileResourceManager pipeFileResourceManager;
+  private PipeTsFileResourceManager pipeTsFileResourceManager;
 
   @Before
   public void setUp() throws Exception {
-    pipeFileResourceManager = new PipeFileResourceManager();
+    pipeTsFileResourceManager = new PipeTsFileResourceManager();
 
     createTsfile(TS_FILE_NAME);
     creatModsFile(MODS_FILE_NAME);
@@ -151,27 +151,32 @@ public class PipeFileResourceManagerTest {
   public void testIncreaseTsfile() throws IOException {
     File originTsfile = new File(TS_FILE_NAME);
     File originModFile = new File(MODS_FILE_NAME);
-    Assert.assertEquals(0, 
pipeFileResourceManager.getFileReferenceCount(originTsfile));
-    Assert.assertEquals(0, 
pipeFileResourceManager.getFileReferenceCount(originModFile));
+    Assert.assertEquals(0, 
pipeTsFileResourceManager.getFileReferenceCount(originTsfile));
+    Assert.assertEquals(0, 
pipeTsFileResourceManager.getFileReferenceCount(originModFile));
 
-    File pipeTsfile = 
pipeFileResourceManager.increaseFileReference(originTsfile, true);
-    File pipeModFile = 
pipeFileResourceManager.increaseFileReference(originModFile, false);
-    Assert.assertEquals(1, 
pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
-    Assert.assertEquals(1, 
pipeFileResourceManager.getFileReferenceCount(pipeModFile));
+    File pipeTsfile = 
pipeTsFileResourceManager.increaseFileReference(originTsfile, true);
+    File pipeModFile = 
pipeTsFileResourceManager.increaseFileReference(originModFile, false);
+    Assert.assertEquals(1, 
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
+    Assert.assertEquals(1, 
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
     Assert.assertTrue(Files.exists(originTsfile.toPath()));
     Assert.assertTrue(Files.exists(originModFile.toPath()));
     Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
     Assert.assertTrue(Files.exists(pipeModFile.toPath()));
 
+    pipeTsFileResourceManager.increaseFileReference(originTsfile, true);
+    pipeTsFileResourceManager.increaseFileReference(originModFile, false);
+    Assert.assertEquals(2, 
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
+    Assert.assertEquals(2, 
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
+
     // test use hardlinkTsFile to increase reference counts
-    pipeFileResourceManager.increaseFileReference(pipeTsfile, true);
-    Assert.assertEquals(2, 
pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
+    pipeTsFileResourceManager.increaseFileReference(pipeTsfile, true);
+    Assert.assertEquals(3, 
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
     Assert.assertTrue(Files.exists(originTsfile.toPath()));
     Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
 
     // test use copyFile to increase reference counts
-    pipeFileResourceManager.increaseFileReference(pipeModFile, false);
-    Assert.assertEquals(2, 
pipeFileResourceManager.getFileReferenceCount(pipeModFile));
+    pipeTsFileResourceManager.increaseFileReference(pipeModFile, false);
+    Assert.assertEquals(3, 
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
     Assert.assertTrue(Files.exists(originModFile.toPath()));
     Assert.assertTrue(Files.exists(pipeModFile.toPath()));
   }
@@ -181,15 +186,15 @@ public class PipeFileResourceManagerTest {
     File originFile = new File(TS_FILE_NAME);
     File originModFile = new File(MODS_FILE_NAME);
 
-    pipeFileResourceManager.decreaseFileReference(originFile);
-    pipeFileResourceManager.decreaseFileReference(originModFile);
-    Assert.assertEquals(0, 
pipeFileResourceManager.getFileReferenceCount(originFile));
-    Assert.assertEquals(0, 
pipeFileResourceManager.getFileReferenceCount(originModFile));
+    pipeTsFileResourceManager.decreaseFileReference(originFile);
+    pipeTsFileResourceManager.decreaseFileReference(originModFile);
+    Assert.assertEquals(0, 
pipeTsFileResourceManager.getFileReferenceCount(originFile));
+    Assert.assertEquals(0, 
pipeTsFileResourceManager.getFileReferenceCount(originModFile));
 
-    File pipeTsfile = 
pipeFileResourceManager.increaseFileReference(originFile, true);
-    File pipeModFile = 
pipeFileResourceManager.increaseFileReference(originModFile, false);
-    Assert.assertEquals(1, 
pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
-    Assert.assertEquals(1, 
pipeFileResourceManager.getFileReferenceCount(pipeModFile));
+    File pipeTsfile = 
pipeTsFileResourceManager.increaseFileReference(originFile, true);
+    File pipeModFile = 
pipeTsFileResourceManager.increaseFileReference(originModFile, false);
+    Assert.assertEquals(1, 
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
+    Assert.assertEquals(1, 
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
     Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
     Assert.assertTrue(Files.exists(pipeModFile.toPath()));
     Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
@@ -200,17 +205,17 @@ public class PipeFileResourceManagerTest {
     Assert.assertFalse(Files.exists(originFile.toPath()));
     Assert.assertFalse(Files.exists(originModFile.toPath()));
 
-    Assert.assertEquals(1, 
pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
-    Assert.assertEquals(1, 
pipeFileResourceManager.getFileReferenceCount(pipeModFile));
+    Assert.assertEquals(1, 
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
+    Assert.assertEquals(1, 
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
     Assert.assertFalse(Files.exists(originFile.toPath()));
     Assert.assertFalse(Files.exists(originModFile.toPath()));
     Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
     Assert.assertTrue(Files.exists(pipeModFile.toPath()));
 
-    pipeFileResourceManager.decreaseFileReference(pipeTsfile);
-    pipeFileResourceManager.decreaseFileReference(pipeModFile);
-    Assert.assertEquals(0, 
pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
-    Assert.assertEquals(0, 
pipeFileResourceManager.getFileReferenceCount(pipeModFile));
+    pipeTsFileResourceManager.decreaseFileReference(pipeTsfile);
+    pipeTsFileResourceManager.decreaseFileReference(pipeModFile);
+    Assert.assertEquals(0, 
pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
+    Assert.assertEquals(0, 
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
     Assert.assertFalse(Files.exists(originFile.toPath()));
     Assert.assertFalse(Files.exists(originModFile.toPath()));
     Assert.assertFalse(Files.exists(pipeTsfile.toPath()));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeWALHardlinkResourceManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeWALHardlinkResourceManagerTest.java
new file mode 100644
index 00000000000..4427113075e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeWALHardlinkResourceManagerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.utils.FileUtils;
+import 
org.apache.iotdb.db.pipe.resource.wal.hardlink.PipeWALHardlinkResourceManager;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+public class PipeWALHardlinkResourceManagerTest {
+  private static final String ROOT_DIR = "target" + File.separator + 
"PipeWALHolderTest";
+
+  private static final String WAL_DIR = ROOT_DIR + File.separator + 
IoTDBConstant.WAL_FOLDER_NAME;
+
+  private static final String WAL_NAME = WAL_DIR + File.separator + "test.wal";
+
+  private PipeWALHardlinkResourceManager pipeWALHardlinkResourceManager;
+
+  @Before
+  public void setUp() throws Exception {
+    pipeWALHardlinkResourceManager = new PipeWALHardlinkResourceManager();
+
+    createWAL();
+  }
+
+  private void createWAL() {
+    File file = new File(WAL_NAME);
+    if (file.exists()) {
+      boolean ignored = file.delete();
+    }
+
+    try {
+      file.getParentFile().mkdirs();
+      file.createNewFile();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    File pipeFolder = new File(ROOT_DIR);
+    if (pipeFolder.exists()) {
+      FileUtils.deleteDirectory(pipeFolder);
+    }
+  }
+
+  @Test
+  public void testIncreaseTsfile() throws IOException {
+    File originWALFile = new File(WAL_NAME);
+    Assert.assertEquals(0, 
pipeWALHardlinkResourceManager.getFileReferenceCount(originWALFile));
+
+    File pipeWALFile = 
pipeWALHardlinkResourceManager.increaseFileReference(originWALFile);
+    Assert.assertEquals(1, 
pipeWALHardlinkResourceManager.getFileReferenceCount(pipeWALFile));
+    Assert.assertTrue(Files.exists(originWALFile.toPath()));
+    Assert.assertTrue(Files.exists(pipeWALFile.toPath()));
+
+    // test use hardlinkTsFile to increase reference counts
+    pipeWALHardlinkResourceManager.increaseFileReference(pipeWALFile);
+    Assert.assertEquals(2, 
pipeWALHardlinkResourceManager.getFileReferenceCount(pipeWALFile));
+    Assert.assertTrue(Files.exists(originWALFile.toPath()));
+    Assert.assertTrue(Files.exists(pipeWALFile.toPath()));
+  }
+
+  @Test
+  public void testDecreaseTsfile() throws IOException {
+    File originFile = new File(WAL_NAME);
+
+    pipeWALHardlinkResourceManager.decreaseFileReference(originFile);
+    Assert.assertEquals(0, 
pipeWALHardlinkResourceManager.getFileReferenceCount(originFile));
+
+    File pipeWALFile = 
pipeWALHardlinkResourceManager.increaseFileReference(originFile);
+    Assert.assertEquals(1, 
pipeWALHardlinkResourceManager.getFileReferenceCount(pipeWALFile));
+    Assert.assertTrue(Files.exists(pipeWALFile.toPath()));
+    Assert.assertTrue(Files.exists(pipeWALFile.toPath()));
+
+    Assert.assertTrue(originFile.delete());
+    Assert.assertFalse(Files.exists(originFile.toPath()));
+
+    Assert.assertEquals(1, 
pipeWALHardlinkResourceManager.getFileReferenceCount(pipeWALFile));
+    Assert.assertFalse(Files.exists(originFile.toPath()));
+    Assert.assertTrue(Files.exists(pipeWALFile.toPath()));
+
+    pipeWALHardlinkResourceManager.decreaseFileReference(pipeWALFile);
+    Assert.assertEquals(0, 
pipeWALHardlinkResourceManager.getFileReferenceCount(pipeWALFile));
+    Assert.assertFalse(Files.exists(originFile.toPath()));
+    Assert.assertFalse(Files.exists(pipeWALFile.toPath()));
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index b5eedff05fb..98104de3b5b 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -952,9 +952,21 @@ cluster_name=defaultCluster
 # If its prefix is "/", then the path is absolute. Otherwise, it is relative.
 # pipe_lib_dir=ext/pipe
 
-# The name of the directory that stores the tsfiles temporarily hold or 
generated by the pipe module.
+# The name of the directory that stores the files temporarily hold or 
generated by the pipe module.
 # The directory is located in the data directory of IoTDB.
-# pipe_hardlink_tsfile_dir_name=pipe
+# pipe_hardlink_base_dir_name=pipe
+
+# The name of the directory that stores the tsfiles temporarily hold or 
generated by the pipe module.
+# The directory is located in the pipe_hardlink_base_dir_name directory of 
IoTDB.
+# pipe_hardlink_tsfile_dir_name=tsfile
+
+# The name of the directory that stores the wal temporarily hold or generated 
by the pipe module.
+# The directory is located in the pipe_hardlink_base_dir_name directory of 
IoTDB.
+# pipe_hardlink_wal_dir_name=wal
+
+# Enable hardlink for wal files or not. If enabled, the wal files will be 
hardlink-ed to
+# the pipe_hardlink_wal_dir_name directory.
+# pipe_hardlink_wal_enabled=false
 
 # The row size of tablets created in pipe transfer.
 # pipe_data_structure_tablet_row_size=65536
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 3732ac188fc..eb8d5f7b2f7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -141,7 +141,13 @@ public class CommonConfig {
    * The name of the directory that stores the tsfiles temporarily hold or 
generated by the pipe
    * module. The directory is located in the data directory of IoTDB.
    */
-  private String pipeHardlinkTsFileDirName = "pipe";
+  private String pipeHardlinkBaseDirName = "pipe";
+
+  private String pipeHardlinkTsFileDirName = "tsfile";
+
+  private String pipeHardlinkWALDirName = "wal";
+
+  private boolean pipeHardLinkWALEnabled = false;
 
   /** The maximum number of threads that can be used to execute subtasks in 
PipeSubtaskExecutor. */
   private int pipeSubtaskExecutorMaxThreadNum = 5;
@@ -458,12 +464,36 @@ public class CommonConfig {
     return timestampPrecision;
   }
 
+  public String getPipeHardlinkBaseDirName() {
+    return pipeHardlinkBaseDirName;
+  }
+
+  public void setPipeHardlinkBaseDirName(String pipeHardlinkBaseDirName) {
+    this.pipeHardlinkBaseDirName = pipeHardlinkBaseDirName;
+  }
+
   public String getPipeHardlinkTsFileDirName() {
     return pipeHardlinkTsFileDirName;
   }
 
-  public void setPipeHardlinkTsFileDirName(String pipeHardlinkTsFileDirName) {
-    this.pipeHardlinkTsFileDirName = pipeHardlinkTsFileDirName;
+  public void setPipeHardlinkTsFileDirName(String pipeTsFileDirName) {
+    this.pipeHardlinkTsFileDirName = pipeTsFileDirName;
+  }
+
+  public String getPipeHardlinkWALDirName() {
+    return pipeHardlinkWALDirName;
+  }
+
+  public void setPipeHardlinkWALDirName(String pipeWALDirName) {
+    this.pipeHardlinkWALDirName = pipeWALDirName;
+  }
+
+  public boolean getPipeHardLinkWALEnabled() {
+    return pipeHardLinkWALEnabled;
+  }
+
+  public void setPipeHardLinkWALEnabled(boolean pipeHardLinkWALEnabled) {
+    this.pipeHardLinkWALEnabled = pipeHardLinkWALEnabled;
   }
 
   public int getPipeDataStructureTabletRowSize() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index ae2ae904fe4..f45c92d479c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -241,9 +241,19 @@ public class CommonDescriptor {
   }
 
   private void loadPipeProps(Properties properties) {
+    config.setPipeHardlinkBaseDirName(
+        properties.getProperty("pipe_hardlink_base_dir_name", 
config.getPipeHardlinkBaseDirName()));
     config.setPipeHardlinkTsFileDirName(
         properties.getProperty(
             "pipe_hardlink_tsfile_dir_name", 
config.getPipeHardlinkTsFileDirName()));
+    config.setPipeHardlinkWALDirName(
+        properties.getProperty("pipe_hardlink_wal_dir_name", 
config.getPipeHardlinkWALDirName()));
+    config.setPipeHardLinkWALEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_hardlink_wal_enabled",
+                Boolean.toString(config.getPipeHardLinkWALEnabled()))));
+
     config.setPipeDataStructureTabletRowSize(
         Integer.parseInt(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 6bbe69079f2..9cb9f4d70c0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -31,10 +31,22 @@ public class PipeConfig {
 
   /////////////////////////////// File ///////////////////////////////
 
+  public String getPipeHardlinkBaseDirName() {
+    return COMMON_CONFIG.getPipeHardlinkBaseDirName();
+  }
+
   public String getPipeHardlinkTsFileDirName() {
     return COMMON_CONFIG.getPipeHardlinkTsFileDirName();
   }
 
+  public String getPipeHardlinkWALDirName() {
+    return COMMON_CONFIG.getPipeHardlinkWALDirName();
+  }
+
+  public boolean getPipeHardLinkWALEnabled() {
+    return COMMON_CONFIG.getPipeHardLinkWALEnabled();
+  }
+
   /////////////////////////////// Tablet ///////////////////////////////
 
   public int getPipeDataStructureTabletRowSize() {
@@ -126,7 +138,12 @@ public class PipeConfig {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConfig.class);
 
   public void printAllConfigs() {
+    LOGGER.info("PipeHardlinkBaseDirName: {}", getPipeHardlinkBaseDirName());
     LOGGER.info("PipeHardlinkTsFileDirName: {}", 
getPipeHardlinkTsFileDirName());
+    LOGGER.info("PipeHardlinkWALDirName: {}", getPipeHardlinkWALDirName());
+    LOGGER.info("PipeHardLinkWALEnabled: {}", getPipeHardLinkWALEnabled());
+
+    LOGGER.info("PipeDataStructureTabletRowSize: {}", 
getPipeDataStructureTabletRowSize());
 
     LOGGER.info("PipeSubtaskExecutorMaxThreadNum: {}", 
getPipeSubtaskExecutorMaxThreadNum());
     LOGGER.info(
@@ -147,6 +164,7 @@ public class PipeConfig {
     LOGGER.info(
         "PipeExtractorPendingQueueTabletLimit: {}", 
getPipeExtractorPendingQueueTabletLimit());
 
+    LOGGER.info("PipeConnectorTimeoutMs: {}", getPipeConnectorTimeoutMs());
     LOGGER.info("PipeConnectorReadFileBufferSize: {}", 
getPipeConnectorReadFileBufferSize());
     LOGGER.info("PipeConnectorRetryIntervalMs: {}", 
getPipeConnectorRetryIntervalMs());
     LOGGER.info("PipeConnectorPendingQueueSize: {}", 
getPipeConnectorPendingQueueSize());
@@ -158,6 +176,10 @@ public class PipeConfig {
     LOGGER.info(
         "PipeMetaSyncerInitialSyncDelayMinutes: {}", 
getPipeMetaSyncerInitialSyncDelayMinutes());
     LOGGER.info("PipeMetaSyncerSyncIntervalMinutes: {}", 
getPipeMetaSyncerSyncIntervalMinutes());
+    LOGGER.info(
+        "PipeMetaSyncerAutoRestartPipeCheckIntervalRound: {}",
+        getPipeMetaSyncerAutoRestartPipeCheckIntervalRound());
+    LOGGER.info("PipeAutoRestartEnabled: {}", getPipeAutoRestartEnabled());
   }
 
   /////////////////////////////// Singleton ///////////////////////////////

Reply via email to