This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 32bfeaf60cf Pipe/Load: Fix loaded files may be compacted before
listening (#12816)
32bfeaf60cf is described below
commit 32bfeaf60cfa748a55190a81308b4ed5813a7c33
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 26 22:06:29 2024 +0800
Pipe/Load: Fix loaded files may be compacted before listening (#12816)
---
.../db/storageengine/dataregion/DataRegion.java | 54 +++++++++++++---------
1 file changed, 32 insertions(+), 22 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 67eaad0988e..89d11f94adc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2703,10 +2703,12 @@ public class DataRegion implements IDataRegionForQuery {
* @param isGeneratedByPipe whether the load tsfile request is generated by
pipe
*/
public void loadNewTsFile(
- TsFileResource newTsFileResource, boolean deleteOriginFile, boolean
isGeneratedByPipe)
+ final TsFileResource newTsFileResource,
+ final boolean deleteOriginFile,
+ final boolean isGeneratedByPipe)
throws LoadFileException {
- File tsfileToBeInserted = newTsFileResource.getTsFile();
- long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
+ final File tsfileToBeInserted = newTsFileResource.getTsFile();
+ final long newFilePartitionId =
newTsFileResource.getTimePartitionWithCheck();
if (!TsFileValidator.getInstance().validateTsFile(newTsFileResource)) {
throw new LoadFileException(
@@ -2716,7 +2718,7 @@ public class DataRegion implements IDataRegionForQuery {
writeLock("loadNewTsFile");
try {
newTsFileResource.setSeq(false);
- String newFileName =
+ final String newFileName =
getNewTsFileName(
System.currentTimeMillis(),
getAndSetNewVersion(newFilePartitionId, newTsFileResource),
@@ -2732,10 +2734,11 @@ public class DataRegion implements IDataRegionForQuery {
fsFactory.getFile(tsfileToBeInserted.getParentFile(),
newFileName));
}
loadTsFileToUnSequence(
- tsfileToBeInserted, newTsFileResource, newFilePartitionId,
deleteOriginFile);
-
- PipeInsertionDataNodeListener.getInstance()
- .listenToTsFile(dataRegionId, newTsFileResource, true,
isGeneratedByPipe);
+ tsfileToBeInserted,
+ newTsFileResource,
+ newFilePartitionId,
+ deleteOriginFile,
+ isGeneratedByPipe);
FileMetrics.getInstance()
.addTsFile(
@@ -2769,7 +2772,7 @@ public class DataRegion implements IDataRegionForQuery {
}
logger.info("TsFile {} is successfully loaded in unsequence list.",
newFileName);
- } catch (DiskSpaceInsufficientException e) {
+ } catch (final DiskSpaceInsufficientException e) {
logger.error(
"Failed to append the tsfile {} to database processor {} because the
disk space is insufficient.",
tsfileToBeInserted.getAbsolutePath(),
@@ -2910,12 +2913,13 @@ public class DataRegion implements IDataRegionForQuery {
* @return load the file successfully @UsedBy sync module, load external
tsfile module.
*/
private boolean loadTsFileToUnSequence(
- File tsFileToLoad,
- TsFileResource tsFileResource,
- long filePartitionId,
- boolean deleteOriginFile)
+ final File tsFileToLoad,
+ final TsFileResource tsFileResource,
+ final long filePartitionId,
+ final boolean deleteOriginFile,
+ boolean isGeneratedByPipe)
throws LoadFileException, DiskSpaceInsufficientException {
- File targetFile;
+ final File targetFile;
targetFile =
fsFactory.getFile(
TierManager.getInstance().getNextFolderForTsFile(0, false),
@@ -2949,7 +2953,7 @@ public class DataRegion implements IDataRegionForQuery {
} else {
Files.copy(tsFileToLoad.toPath(), targetFile.toPath());
}
- } catch (IOException e) {
+ } catch (final IOException e) {
logger.error(
"File renaming failed when loading tsfile. Origin: {}, Target: {}",
tsFileToLoad.getAbsolutePath(),
@@ -2961,8 +2965,10 @@ public class DataRegion implements IDataRegionForQuery {
tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath(),
e.getMessage()));
}
- File resourceFileToLoad = fsFactory.getFile(tsFileToLoad.getAbsolutePath()
+ RESOURCE_SUFFIX);
- File targetResourceFile = fsFactory.getFile(targetFile.getAbsolutePath() +
RESOURCE_SUFFIX);
+ final File resourceFileToLoad =
+ fsFactory.getFile(tsFileToLoad.getAbsolutePath() + RESOURCE_SUFFIX);
+ final File targetResourceFile =
+ fsFactory.getFile(targetFile.getAbsolutePath() + RESOURCE_SUFFIX);
try {
if (deleteOriginFile) {
FileUtils.moveFile(resourceFileToLoad, targetResourceFile);
@@ -2970,7 +2976,7 @@ public class DataRegion implements IDataRegionForQuery {
Files.copy(resourceFileToLoad.toPath(), targetResourceFile.toPath());
}
- } catch (IOException e) {
+ } catch (final IOException e) {
logger.error(
"File renaming failed when loading .resource file. Origin: {},
Target: {}",
resourceFileToLoad.getAbsolutePath(),
@@ -2984,16 +2990,16 @@ public class DataRegion implements IDataRegionForQuery {
e.getMessage()));
}
- File modFileToLoad =
+ final File modFileToLoad =
fsFactory.getFile(tsFileToLoad.getAbsolutePath() +
ModificationFile.FILE_SUFFIX);
if (modFileToLoad.exists()) {
// when successfully loaded, the filepath of the resource will be
changed to the IoTDB data
// dir, so we can add a suffix to find the old modification file.
- File targetModFile =
+ final File targetModFile =
fsFactory.getFile(targetFile.getAbsolutePath() +
ModificationFile.FILE_SUFFIX);
try {
Files.deleteIfExists(targetModFile.toPath());
- } catch (IOException e) {
+ } catch (final IOException e) {
logger.warn("Cannot delete localModFile {}", targetModFile, e);
}
try {
@@ -3002,7 +3008,7 @@ public class DataRegion implements IDataRegionForQuery {
} else {
Files.copy(modFileToLoad.toPath(), targetModFile.toPath());
}
- } catch (IOException e) {
+ } catch (final IOException e) {
logger.error(
"File renaming failed when loading .mod file. Origin: {}, Target:
{}",
modFileToLoad.getAbsolutePath(),
@@ -3018,6 +3024,10 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ // Listen before the tsFile is added into tsFile manager to avoid it being
compacted
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToTsFile(dataRegionId, tsFileResource, true, isGeneratedByPipe);
+
// help tsfile resource degrade
tsFileResourceManager.registerSealedTsFileResource(tsFileResource);