This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 64e65c6df51 [To dev/1.3] Pipe: Fixed hard-link problem (#15984)
(#15986)
64e65c6df51 is described below
commit 64e65c6df512215bed6da073ec24147d73e90f10
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jul 22 09:34:27 2025 +0800
[To dev/1.3] Pipe: Fixed hard-link problem (#15984) (#15986)
* Update PipeTsFileResourceManager.java
* Update PipeDataRegionAssigner.java
* Update PipeTsFileResourceManager.java
* Update PipeTsFileResourceManager.java
---
.../realtime/assigner/PipeDataRegionAssigner.java | 9 ----
.../resource/tsfile/PipeTsFileResourceManager.java | 60 +++++++++++-----------
2 files changed, 31 insertions(+), 38 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 8e415772438..0b83876b186 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -19,9 +19,6 @@
package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
-import org.apache.iotdb.commons.consensus.index.ProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
@@ -39,14 +36,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
-import java.util.concurrent.atomic.AtomicReference;
public class PipeDataRegionAssigner implements Closeable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeDataRegionAssigner.class);
- private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
-
/**
* The {@link PipeDataRegionMatcher} is used to match the event with the
extractor based on the
* pattern.
@@ -58,9 +52,6 @@ public class PipeDataRegionAssigner implements Closeable {
private final String dataRegionId;
- private final AtomicReference<ProgressIndex>
maxProgressIndexForRealtimeEvent =
- new AtomicReference<>(MinimumProgressIndex.INSTANCE);
-
private final PipeEventCounter eventCounter = new
PipeDataRegionEventCounter();
public String getDataRegionId() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index bf8b6b86b43..86a5d40d375 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -56,6 +56,11 @@ public class PipeTsFileResourceManager {
hardlinkOrCopiedFileToPipeTsFileResourceMap = new ConcurrentHashMap<>();
private final PipeTsFileResourceSegmentLock segmentLock = new
PipeTsFileResourceSegmentLock();
+ public File increaseFileReference(
+ final File file, final boolean isTsFile, final @Nullable String
pipeName) throws IOException {
+ return increaseFileReference(file, isTsFile, pipeName, null);
+ }
+
/**
* Given a file, create a hardlink or copy it to pipe dir, maintain a
reference count for the
* hardlink or copied file, and return the hardlink or copied file.
@@ -72,16 +77,24 @@ public class PipeTsFileResourceManager {
* @param file tsfile, resource file or mod file. can be original file or
hardlink/copy of
* original file
* @param isTsFile {@code true} to create hardlink, {@code false} to copy
file
+ * @param pipeName Nonnull if the pipe is from historical or assigner ->
extractors, null if is
+ * dataRegion -> assigner
+ * @param sourceFile for inner use, historical extractor will use this to
create hardlink from
+ * pipe tsFile -> common tsFile
* @return the hardlink or copied file
* @throws IOException when create hardlink or copy file failed
*/
- public File increaseFileReference(
- final File file, final boolean isTsFile, final @Nullable String
pipeName) throws IOException {
+ private File increaseFileReference(
+ final File file,
+ final boolean isTsFile,
+ final @Nullable String pipeName,
+ final @Nullable File sourceFile)
+ throws IOException {
// If the file is already a hardlink or copied file,
// just increase reference count and return it
segmentLock.lock(file);
try {
- if (increaseReferenceIfExists(file, pipeName)) {
+ if (increaseReferenceIfExists(file, pipeName, isTsFile)) {
return file;
}
} finally {
@@ -90,19 +103,22 @@ public class PipeTsFileResourceManager {
// If the file is not a hardlink or copied file, check if there is a
related hardlink or
// copied file in pipe dir. if so, increase reference count and return it
- final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file,
pipeName);
+ final File hardlinkOrCopiedFile =
+ Objects.isNull(sourceFile) ? getHardlinkOrCopiedFileInPipeDir(file,
pipeName) : file;
segmentLock.lock(hardlinkOrCopiedFile);
try {
- if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName)) {
+ if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile))
{
return
getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile();
}
// If the file is a tsfile, create a hardlink in pipe dir and will
return it.
// otherwise, copy the file (.mod or .resource) to pipe dir and will
return it.
+ final File source = Objects.isNull(sourceFile) ? file : sourceFile;
+
final File resultFile =
isTsFile
- ? FileUtils.createHardLink(file, hardlinkOrCopiedFile)
- : FileUtils.copyFile(file, hardlinkOrCopiedFile);
+ ? FileUtils.createHardLink(source, hardlinkOrCopiedFile)
+ : FileUtils.copyFile(source, hardlinkOrCopiedFile);
// If the file is not a hardlink or copied file, and there is no related
hardlink or copied
// file in pipe dir, create a hardlink or copy it to pipe dir, maintain
a reference count for
@@ -116,7 +132,7 @@ public class PipeTsFileResourceManager {
resultFile.getPath(), new PipeTsFilePublicResource(resultFile));
}
- increasePublicReference(resultFile, pipeName);
+ increasePublicReference(resultFile, pipeName, isTsFile);
return resultFile;
} finally {
@@ -124,34 +140,26 @@ public class PipeTsFileResourceManager {
}
}
- private boolean increaseReferenceIfExists(final File file, final @Nullable
String pipeName) {
+ private boolean increaseReferenceIfExists(
+ final File file, final @Nullable String pipeName, final boolean
isTsFile) throws IOException {
final String path = file.getPath();
final PipeTsFileResource resource = getResourceMap(pipeName).get(path);
if (resource != null) {
resource.increaseReferenceCount();
- increasePublicReference(file, pipeName);
+ increasePublicReference(file, pipeName, isTsFile);
return true;
}
return false;
}
- private void increasePublicReference(final File file, final String pipeName)
{
+ private void increasePublicReference(
+ final File file, final String pipeName, final boolean isTsFile) throws
IOException {
if (Objects.isNull(pipeName)) {
return;
}
// Increase the assigner's file to avoid hard-link or memory cache cleaning
// Note that it does not exist for historical files
- final String path = getCommonFilePath(file);
- hardlinkOrCopiedFileToTsFilePublicResourceMap.compute(
- path,
- (k, v) -> {
- if (Objects.isNull(v)) {
- return new PipeTsFilePublicResource(new File(path));
- } else {
- v.increaseReferenceCount();
- return v;
- }
- });
+ increaseFileReference(new File(getCommonFilePath(file)), isTsFile, null,
file);
}
public static File getHardlinkOrCopiedFileInPipeDir(
@@ -228,13 +236,7 @@ public class PipeTsFileResourceManager {
}
// Increase the assigner's file to avoid hard-link or memory cache cleaning
// Note that it does not exist for historical files
- final String commonFilePath = getCommonFilePath(file);
- if
(hardlinkOrCopiedFileToTsFilePublicResourceMap.containsKey(commonFilePath)
- && hardlinkOrCopiedFileToTsFilePublicResourceMap
- .get(commonFilePath)
- .decreaseReferenceCount()) {
- hardlinkOrCopiedFileToPipeTsFileResourceMap.remove(commonFilePath);
- }
+ decreaseFileReference(new File(getCommonFilePath(file)), null);
}
// Warning: Shall not be called by the assigner