This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 35c874415485fe5185edf8f838a86a722b0a4871
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jul 23 16:11:13 2025 +0800

    Pipe: Fixed the hard-link lock problem & Some pipe CIs on master (#16006)
    
    * Update IoTDBPipePermissionIT.java
    
    * refacotr
    
    * revert=pom
    
    * fix-lock
    
    (cherry picked from commit 376162a3ad9a9e93cf44b13737251b7d5f8129ef)
---
 .../manual/basic/IoTDBPipePermissionIT.java        |  6 ++-
 .../overview/PipeDataNodeSinglePipeMetrics.java    |  6 +--
 .../resource/tsfile/PipeTsFileResourceManager.java | 62 +++++++++++-----------
 3 files changed, 40 insertions(+), 34 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
index 890a90cdb53..f991b9bb78e 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
@@ -207,7 +207,11 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeTableModelDualManualIT {
       return;
     }
 
-    TableModelUtils.createDataBaseAndTable(receiverEnv, "test", "test");
+    try {
+      TableModelUtils.createDataBaseAndTable(receiverEnv, "test", "test");
+    } catch (final Exception ignore) {
+      // Ignore because the db/table may be transferred because sender user 
may see these
+    }
 
     // Exception, block here
     TableModelUtils.assertCountDataAlwaysOnEnv("test", "test", 0, receiverEnv);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 1840093a347..65efae61714 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -404,18 +404,18 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
 
   //////////////////////////// singleton ////////////////////////////
 
-  private static class PipeDataNodeRemainingEventAndTimeMetricsHolder {
+  private static class PipeDataNodeSinglePipeMetricsHolder {
 
     private static final PipeDataNodeSinglePipeMetrics INSTANCE =
         new PipeDataNodeSinglePipeMetrics();
 
-    private PipeDataNodeRemainingEventAndTimeMetricsHolder() {
+    private PipeDataNodeSinglePipeMetricsHolder() {
       // Empty constructor
     }
   }
 
   public static PipeDataNodeSinglePipeMetrics getInstance() {
-    return PipeDataNodeRemainingEventAndTimeMetricsHolder.INSTANCE;
+    return PipeDataNodeSinglePipeMetricsHolder.INSTANCE;
   }
 
   private PipeDataNodeSinglePipeMetrics() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index d1d1286991b..c1e4ff9ddf1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -91,30 +91,27 @@ public class PipeTsFileResourceManager {
       throws IOException {
     // If the file is already a hardlink or copied file,
     // just increase reference count and return it
-    segmentLock.lock(file);
-    try {
-      if (increaseReferenceIfExists(file, pipeName, isTsFile)) {
-        return file;
-      }
-    } finally {
-      segmentLock.unlock(file);
+    if (increaseReferenceIfExists(file, pipeName, isTsFile)) {
+      return file;
     }
 
     // If the file is not a hardlink or copied file, check if there is a 
related hardlink or
     // copied file in pipe dir. if so, increase reference count and return it
     final File hardlinkOrCopiedFile =
         Objects.isNull(sourceFile) ? getHardlinkOrCopiedFileInPipeDir(file, 
pipeName) : file;
-    segmentLock.lock(hardlinkOrCopiedFile);
-    try {
-      if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) 
{
-        return 
getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile();
-      }
 
-      // If the file is a tsfile, create a hardlink in pipe dir and will 
return it.
-      // otherwise, copy the file (.mod or .resource) to pipe dir and will 
return it.
-      final File source = Objects.isNull(sourceFile) ? file : sourceFile;
+    if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) {
+      return 
getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile();
+    }
+
+    // If the file is a tsfile, create a hardlink in pipe dir and will return 
it.
+    // otherwise, copy the file (.mod or .resource) to pipe dir and will 
return it.
+    final File source = Objects.isNull(sourceFile) ? file : sourceFile;
+    final File resultFile;
 
-      final File resultFile =
+    segmentLock.lock(hardlinkOrCopiedFile);
+    try {
+      resultFile =
           isTsFile
               ? FileUtils.createHardLink(source, hardlinkOrCopiedFile)
               : FileUtils.copyFile(source, hardlinkOrCopiedFile);
@@ -130,25 +127,29 @@ public class PipeTsFileResourceManager {
         hardlinkOrCopiedFileToTsFilePublicResourceMap.put(
             resultFile.getPath(), new PipeTsFilePublicResource(resultFile));
       }
-
-      increasePublicReference(resultFile, pipeName, isTsFile);
-
-      return resultFile;
     } finally {
       segmentLock.unlock(hardlinkOrCopiedFile);
     }
+    increasePublicReference(resultFile, pipeName, isTsFile);
+    return resultFile;
   }
 
   private boolean increaseReferenceIfExists(
       final File file, final @Nullable String pipeName, final boolean 
isTsFile) throws IOException {
-    final String path = file.getPath();
-    final PipeTsFileResource resource = getResourceMap(pipeName).get(path);
-    if (resource != null) {
-      resource.increaseReferenceCount();
-      increasePublicReference(file, pipeName, isTsFile);
-      return true;
+    segmentLock.lock(file);
+    try {
+      final String path = file.getPath();
+      final PipeTsFileResource resource = getResourceMap(pipeName).get(path);
+      if (resource != null) {
+        resource.increaseReferenceCount();
+      } else {
+        return false;
+      }
+    } finally {
+      segmentLock.unlock(file);
     }
-    return false;
+    increasePublicReference(file, pipeName, isTsFile);
+    return true;
   }
 
   private void increasePublicReference(
@@ -221,12 +222,13 @@ public class PipeTsFileResourceManager {
       if (resource != null && resource.decreaseReferenceCount()) {
         getResourceMap(pipeName).remove(filePath);
       }
-      // Decrease the assigner's file to clear hard-link and memory cache
-      // Note that it does not exist for historical files
-      decreasePublicReferenceIfExists(hardlinkOrCopiedFile, pipeName);
     } finally {
       segmentLock.unlock(hardlinkOrCopiedFile);
     }
+
+    // Decrease the assigner's file to clear hard-link and memory cache
+    // Note that it does not exist for historical files
+    decreasePublicReferenceIfExists(hardlinkOrCopiedFile, pipeName);
   }
 
   private void decreasePublicReferenceIfExists(final File file, final 
@Nullable String pipeName) {

Reply via email to