This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 58ca1c6a469 Pipe: Fixed the hard-link lock problem & Some pipe CIs on
master (#16006) (#16012)
58ca1c6a469 is described below
commit 58ca1c6a4691181325dfe62bf25d8f1c96f8444e
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jul 24 17:02:16 2025 +0800
Pipe: Fixed the hard-link lock problem & Some pipe CIs on master (#16006)
(#16012)
* Update IoTDBPipePermissionIT.java
* refacotr
* revert=pom
* fix-lock
---
.../overview/PipeDataNodeSinglePipeMetrics.java | 6 +--
.../resource/tsfile/PipeTsFileResourceManager.java | 62 +++++++++++-----------
2 files changed, 35 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 1f404507632..70b4561bb5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -408,18 +408,18 @@ public class PipeDataNodeSinglePipeMetrics implements
IMetricSet {
//////////////////////////// singleton ////////////////////////////
- private static class PipeDataNodeRemainingEventAndTimeMetricsHolder {
+ private static class PipeDataNodeSinglePipeMetricsHolder {
private static final PipeDataNodeSinglePipeMetrics INSTANCE =
new PipeDataNodeSinglePipeMetrics();
- private PipeDataNodeRemainingEventAndTimeMetricsHolder() {
+ private PipeDataNodeSinglePipeMetricsHolder() {
// Empty constructor
}
}
public static PipeDataNodeSinglePipeMetrics getInstance() {
- return PipeDataNodeRemainingEventAndTimeMetricsHolder.INSTANCE;
+ return PipeDataNodeSinglePipeMetricsHolder.INSTANCE;
}
private PipeDataNodeSinglePipeMetrics() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 86a5d40d375..fd74dadac5e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -92,30 +92,27 @@ public class PipeTsFileResourceManager {
throws IOException {
// If the file is already a hardlink or copied file,
// just increase reference count and return it
- segmentLock.lock(file);
- try {
- if (increaseReferenceIfExists(file, pipeName, isTsFile)) {
- return file;
- }
- } finally {
- segmentLock.unlock(file);
+ if (increaseReferenceIfExists(file, pipeName, isTsFile)) {
+ return file;
}
// If the file is not a hardlink or copied file, check if there is a
related hardlink or
// copied file in pipe dir. if so, increase reference count and return it
final File hardlinkOrCopiedFile =
Objects.isNull(sourceFile) ? getHardlinkOrCopiedFileInPipeDir(file,
pipeName) : file;
- segmentLock.lock(hardlinkOrCopiedFile);
- try {
- if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile))
{
- return
getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile();
- }
- // If the file is a tsfile, create a hardlink in pipe dir and will
return it.
- // otherwise, copy the file (.mod or .resource) to pipe dir and will
return it.
- final File source = Objects.isNull(sourceFile) ? file : sourceFile;
+ if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) {
+ return
getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile();
+ }
+
+ // If the file is a tsfile, create a hardlink in pipe dir and will return
it.
+ // otherwise, copy the file (.mod or .resource) to pipe dir and will
return it.
+ final File source = Objects.isNull(sourceFile) ? file : sourceFile;
+ final File resultFile;
- final File resultFile =
+ segmentLock.lock(hardlinkOrCopiedFile);
+ try {
+ resultFile =
isTsFile
? FileUtils.createHardLink(source, hardlinkOrCopiedFile)
: FileUtils.copyFile(source, hardlinkOrCopiedFile);
@@ -131,25 +128,29 @@ public class PipeTsFileResourceManager {
hardlinkOrCopiedFileToTsFilePublicResourceMap.put(
resultFile.getPath(), new PipeTsFilePublicResource(resultFile));
}
-
- increasePublicReference(resultFile, pipeName, isTsFile);
-
- return resultFile;
} finally {
segmentLock.unlock(hardlinkOrCopiedFile);
}
+ increasePublicReference(resultFile, pipeName, isTsFile);
+ return resultFile;
}
private boolean increaseReferenceIfExists(
final File file, final @Nullable String pipeName, final boolean
isTsFile) throws IOException {
- final String path = file.getPath();
- final PipeTsFileResource resource = getResourceMap(pipeName).get(path);
- if (resource != null) {
- resource.increaseReferenceCount();
- increasePublicReference(file, pipeName, isTsFile);
- return true;
+ segmentLock.lock(file);
+ try {
+ final String path = file.getPath();
+ final PipeTsFileResource resource = getResourceMap(pipeName).get(path);
+ if (resource != null) {
+ resource.increaseReferenceCount();
+ } else {
+ return false;
+ }
+ } finally {
+ segmentLock.unlock(file);
}
- return false;
+ increasePublicReference(file, pipeName, isTsFile);
+ return true;
}
private void increasePublicReference(
@@ -222,12 +223,13 @@ public class PipeTsFileResourceManager {
if (resource != null && resource.decreaseReferenceCount()) {
getResourceMap(pipeName).remove(filePath);
}
- // Decrease the assigner's file to clear hard-link and memory cache
- // Note that it does not exist for historical files
- decreasePublicReferenceIfExists(hardlinkOrCopiedFile, pipeName);
} finally {
segmentLock.unlock(hardlinkOrCopiedFile);
}
+
+ // Decrease the assigner's file to clear hard-link and memory cache
+ // Note that it does not exist for historical files
+ decreasePublicReferenceIfExists(hardlinkOrCopiedFile, pipeName);
}
private void decreasePublicReferenceIfExists(final File file, final
@Nullable String pipeName) {