This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 64e65c6df51 [To dev/1.3] Pipe: Fixed hard-link problem (#15984) 
(#15986)
64e65c6df51 is described below

commit 64e65c6df512215bed6da073ec24147d73e90f10
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jul 22 09:34:27 2025 +0800

    [To dev/1.3] Pipe: Fixed hard-link problem (#15984) (#15986)
    
    * Update PipeTsFileResourceManager.java
    
    * Update PipeDataRegionAssigner.java
    
    * Update PipeTsFileResourceManager.java
    
    * Update PipeTsFileResourceManager.java
---
 .../realtime/assigner/PipeDataRegionAssigner.java  |  9 ----
 .../resource/tsfile/PipeTsFileResourceManager.java | 60 +++++++++++-----------
 2 files changed, 31 insertions(+), 38 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 8e415772438..0b83876b186 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -19,9 +19,6 @@
 
 package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
 
-import org.apache.iotdb.commons.consensus.index.ProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
@@ -39,14 +36,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeDataRegionAssigner implements Closeable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeDataRegionAssigner.class);
 
-  private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
-
   /**
    * The {@link PipeDataRegionMatcher} is used to match the event with the 
extractor based on the
    * pattern.
@@ -58,9 +52,6 @@ public class PipeDataRegionAssigner implements Closeable {
 
   private final String dataRegionId;
 
-  private final AtomicReference<ProgressIndex> 
maxProgressIndexForRealtimeEvent =
-      new AtomicReference<>(MinimumProgressIndex.INSTANCE);
-
   private final PipeEventCounter eventCounter = new 
PipeDataRegionEventCounter();
 
   public String getDataRegionId() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index bf8b6b86b43..86a5d40d375 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -56,6 +56,11 @@ public class PipeTsFileResourceManager {
       hardlinkOrCopiedFileToPipeTsFileResourceMap = new ConcurrentHashMap<>();
   private final PipeTsFileResourceSegmentLock segmentLock = new 
PipeTsFileResourceSegmentLock();
 
+  public File increaseFileReference(
+      final File file, final boolean isTsFile, final @Nullable String 
pipeName) throws IOException {
+    return increaseFileReference(file, isTsFile, pipeName, null);
+  }
+
   /**
    * Given a file, create a hardlink or copy it to pipe dir, maintain a 
reference count for the
    * hardlink or copied file, and return the hardlink or copied file.
@@ -72,16 +77,24 @@ public class PipeTsFileResourceManager {
    * @param file tsfile, resource file or mod file. can be original file or 
hardlink/copy of
    *     original file
    * @param isTsFile {@code true} to create hardlink, {@code false} to copy 
file
+   * @param pipeName Nonnull if the pipe is from historical or assigner -> 
extractors, null if is
+   *     dataRegion -> assigner
+   * @param sourceFile for inner use, historical extractor will use this to 
create hardlink from
+   *     pipe tsFile -> common tsFile
    * @return the hardlink or copied file
    * @throws IOException when create hardlink or copy file failed
    */
-  public File increaseFileReference(
-      final File file, final boolean isTsFile, final @Nullable String 
pipeName) throws IOException {
+  private File increaseFileReference(
+      final File file,
+      final boolean isTsFile,
+      final @Nullable String pipeName,
+      final @Nullable File sourceFile)
+      throws IOException {
     // If the file is already a hardlink or copied file,
     // just increase reference count and return it
     segmentLock.lock(file);
     try {
-      if (increaseReferenceIfExists(file, pipeName)) {
+      if (increaseReferenceIfExists(file, pipeName, isTsFile)) {
         return file;
       }
     } finally {
@@ -90,19 +103,22 @@ public class PipeTsFileResourceManager {
 
     // If the file is not a hardlink or copied file, check if there is a 
related hardlink or
     // copied file in pipe dir. if so, increase reference count and return it
-    final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file, 
pipeName);
+    final File hardlinkOrCopiedFile =
+        Objects.isNull(sourceFile) ? getHardlinkOrCopiedFileInPipeDir(file, 
pipeName) : file;
     segmentLock.lock(hardlinkOrCopiedFile);
     try {
-      if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName)) {
+      if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) 
{
         return 
getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile();
       }
 
       // If the file is a tsfile, create a hardlink in pipe dir and will 
return it.
       // otherwise, copy the file (.mod or .resource) to pipe dir and will 
return it.
+      final File source = Objects.isNull(sourceFile) ? file : sourceFile;
+
       final File resultFile =
           isTsFile
-              ? FileUtils.createHardLink(file, hardlinkOrCopiedFile)
-              : FileUtils.copyFile(file, hardlinkOrCopiedFile);
+              ? FileUtils.createHardLink(source, hardlinkOrCopiedFile)
+              : FileUtils.copyFile(source, hardlinkOrCopiedFile);
 
       // If the file is not a hardlink or copied file, and there is no related 
hardlink or copied
       // file in pipe dir, create a hardlink or copy it to pipe dir, maintain 
a reference count for
@@ -116,7 +132,7 @@ public class PipeTsFileResourceManager {
             resultFile.getPath(), new PipeTsFilePublicResource(resultFile));
       }
 
-      increasePublicReference(resultFile, pipeName);
+      increasePublicReference(resultFile, pipeName, isTsFile);
 
       return resultFile;
     } finally {
@@ -124,34 +140,26 @@ public class PipeTsFileResourceManager {
     }
   }
 
-  private boolean increaseReferenceIfExists(final File file, final @Nullable 
String pipeName) {
+  private boolean increaseReferenceIfExists(
+      final File file, final @Nullable String pipeName, final boolean 
isTsFile) throws IOException {
     final String path = file.getPath();
     final PipeTsFileResource resource = getResourceMap(pipeName).get(path);
     if (resource != null) {
       resource.increaseReferenceCount();
-      increasePublicReference(file, pipeName);
+      increasePublicReference(file, pipeName, isTsFile);
       return true;
     }
     return false;
   }
 
-  private void increasePublicReference(final File file, final String pipeName) 
{
+  private void increasePublicReference(
+      final File file, final String pipeName, final boolean isTsFile) throws 
IOException {
     if (Objects.isNull(pipeName)) {
       return;
     }
     // Increase the assigner's file to avoid hard-link or memory cache cleaning
     // Note that it does not exist for historical files
-    final String path = getCommonFilePath(file);
-    hardlinkOrCopiedFileToTsFilePublicResourceMap.compute(
-        path,
-        (k, v) -> {
-          if (Objects.isNull(v)) {
-            return new PipeTsFilePublicResource(new File(path));
-          } else {
-            v.increaseReferenceCount();
-            return v;
-          }
-        });
+    increaseFileReference(new File(getCommonFilePath(file)), isTsFile, null, 
file);
   }
 
   public static File getHardlinkOrCopiedFileInPipeDir(
@@ -228,13 +236,7 @@ public class PipeTsFileResourceManager {
     }
     // Increase the assigner's file to avoid hard-link or memory cache cleaning
     // Note that it does not exist for historical files
-    final String commonFilePath = getCommonFilePath(file);
-    if 
(hardlinkOrCopiedFileToTsFilePublicResourceMap.containsKey(commonFilePath)
-        && hardlinkOrCopiedFileToTsFilePublicResourceMap
-            .get(commonFilePath)
-            .decreaseReferenceCount()) {
-      hardlinkOrCopiedFileToPipeTsFileResourceMap.remove(commonFilePath);
-    }
+    decreaseFileReference(new File(getCommonFilePath(file)), null);
   }
 
   // Warning: Shall not be called by the assigner

Reply via email to