This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 35c874415485fe5185edf8f838a86a722b0a4871 Author: Caideyipi <[email protected]> AuthorDate: Wed Jul 23 16:11:13 2025 +0800 Pipe: Fixed the hard-link lock problem & Some pipe CIs on master (#16006) * Update IoTDBPipePermissionIT.java * refacotr * revert=pom * fix-lock (cherry picked from commit 376162a3ad9a9e93cf44b13737251b7d5f8129ef) --- .../manual/basic/IoTDBPipePermissionIT.java | 6 ++- .../overview/PipeDataNodeSinglePipeMetrics.java | 6 +-- .../resource/tsfile/PipeTsFileResourceManager.java | 62 +++++++++++----------- 3 files changed, 40 insertions(+), 34 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java index 890a90cdb53..f991b9bb78e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java @@ -207,7 +207,11 @@ public class IoTDBPipePermissionIT extends AbstractPipeTableModelDualManualIT { return; } - TableModelUtils.createDataBaseAndTable(receiverEnv, "test", "test"); + try { + TableModelUtils.createDataBaseAndTable(receiverEnv, "test", "test"); + } catch (final Exception ignore) { + // Ignore because the db/table may be transferred because sender user may see these + } // Exception, block here TableModelUtils.assertCountDataAlwaysOnEnv("test", "test", 0, receiverEnv); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java index 1840093a347..65efae61714 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java @@ -404,18 +404,18 @@ public class PipeDataNodeSinglePipeMetrics implements IMetricSet { //////////////////////////// singleton //////////////////////////// - private static class PipeDataNodeRemainingEventAndTimeMetricsHolder { + private static class PipeDataNodeSinglePipeMetricsHolder { private static final PipeDataNodeSinglePipeMetrics INSTANCE = new PipeDataNodeSinglePipeMetrics(); - private PipeDataNodeRemainingEventAndTimeMetricsHolder() { + private PipeDataNodeSinglePipeMetricsHolder() { // Empty constructor } } public static PipeDataNodeSinglePipeMetrics getInstance() { - return PipeDataNodeRemainingEventAndTimeMetricsHolder.INSTANCE; + return PipeDataNodeSinglePipeMetricsHolder.INSTANCE; } private PipeDataNodeSinglePipeMetrics() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index d1d1286991b..c1e4ff9ddf1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -91,30 +91,27 @@ public class PipeTsFileResourceManager { throws IOException { // If the file is already a hardlink or copied file, // just increase reference count and return it - segmentLock.lock(file); - try { - if (increaseReferenceIfExists(file, pipeName, isTsFile)) { - return file; - } - } finally { - segmentLock.unlock(file); + if (increaseReferenceIfExists(file, pipeName, isTsFile)) { + return file; } // If the file is not a hardlink or copied file, check if there is a related hardlink or // copied file in pipe dir. if so, increase reference count and return it final File hardlinkOrCopiedFile = Objects.isNull(sourceFile) ? getHardlinkOrCopiedFileInPipeDir(file, pipeName) : file; - segmentLock.lock(hardlinkOrCopiedFile); - try { - if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) { - return getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile(); - } - // If the file is a tsfile, create a hardlink in pipe dir and will return it. - // otherwise, copy the file (.mod or .resource) to pipe dir and will return it. - final File source = Objects.isNull(sourceFile) ? file : sourceFile; + if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) { + return getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile(); + } + + // If the file is a tsfile, create a hardlink in pipe dir and will return it. + // otherwise, copy the file (.mod or .resource) to pipe dir and will return it. + final File source = Objects.isNull(sourceFile) ? file : sourceFile; + final File resultFile; - final File resultFile = + segmentLock.lock(hardlinkOrCopiedFile); + try { + resultFile = isTsFile ? FileUtils.createHardLink(source, hardlinkOrCopiedFile) : FileUtils.copyFile(source, hardlinkOrCopiedFile); @@ -130,25 +127,29 @@ public class PipeTsFileResourceManager { hardlinkOrCopiedFileToTsFilePublicResourceMap.put( resultFile.getPath(), new PipeTsFilePublicResource(resultFile)); } - - increasePublicReference(resultFile, pipeName, isTsFile); - - return resultFile; } finally { segmentLock.unlock(hardlinkOrCopiedFile); } + increasePublicReference(resultFile, pipeName, isTsFile); + return resultFile; } private boolean increaseReferenceIfExists( final File file, final @Nullable String pipeName, final boolean isTsFile) throws IOException { - final String path = file.getPath(); - final PipeTsFileResource resource = getResourceMap(pipeName).get(path); - if (resource != null) { - resource.increaseReferenceCount(); - increasePublicReference(file, pipeName, isTsFile); - return true; + segmentLock.lock(file); + try { + final String path = file.getPath(); + final PipeTsFileResource resource = getResourceMap(pipeName).get(path); + if (resource != null) { + resource.increaseReferenceCount(); + } else { + return false; + } + } finally { + segmentLock.unlock(file); } - return false; + increasePublicReference(file, pipeName, isTsFile); + return true; } private void increasePublicReference( @@ -221,12 +222,13 @@ public class PipeTsFileResourceManager { if (resource != null && resource.decreaseReferenceCount()) { getResourceMap(pipeName).remove(filePath); } - // Decrease the assigner's file to clear hard-link and memory cache - // Note that it does not exist for historical files - decreasePublicReferenceIfExists(hardlinkOrCopiedFile, pipeName); } finally { segmentLock.unlock(hardlinkOrCopiedFile); } + + // Decrease the assigner's file to clear hard-link and memory cache + // Note that it does not exist for historical files + decreasePublicReferenceIfExists(hardlinkOrCopiedFile, pipeName); } private void decreasePublicReferenceIfExists(final File file, final @Nullable String pipeName) {
