This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 2f648680af4 [to rc/1.3.3] Fix the allocation of target file name in
repair data task (#13492)
2f648680af4 is described below
commit 2f648680af43a3211c38fbeba2963b5d6fef91f9
Author: shuwenwei <[email protected]>
AuthorDate: Fri Sep 13 09:56:27 2024 +0800
[to rc/1.3.3] Fix the allocation of target file name in repair data task
(#13492)
* fix repair task target file name allocation
* use constant
* fix size tiered compaction selector
* Revert "use constant"
This reverts commit fb646c2c7c5f195bb257e5092b69db8130f5f8d8.
* Revert "fix repair task target file name allocation"
This reverts commit 15e2c35a22230eae2f2c5682676a506820037608.
* allocate file timestamp
---
.../db/storageengine/dataregion/DataRegion.java | 3 ++
.../task/RepairUnsortedFileCompactionTask.java | 63 +++++++++++-----------
.../impl/NewSizeTieredCompactionSelector.java | 7 ++-
.../dataregion/tsfile/TsFileManager.java | 14 +++++
4 files changed, 55 insertions(+), 32 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 7d02ad4eb0a..0ecf7f7c81c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -70,6 +70,7 @@ import
org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverManager;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.RepairUnsortedFileCompactionTask;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
@@ -681,6 +682,8 @@ public class DataRegion implements IDataRegionForQuery {
&& !config.isEnableCrossSpaceCompaction()) {
return;
}
+ RepairUnsortedFileCompactionTask.recoverAllocatedFileTimestamp(
+ tsFileManager.getMaxFileTimestampOfUnSequenceFile());
CompactionScheduleTaskManager.getInstance().registerDataRegion(this);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
index e4a2c568682..ccc02e9f5ed 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
@@ -38,6 +38,7 @@ import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Repair the internal unsorted file by compaction and move it to unSequence
space after compaction
@@ -45,6 +46,14 @@ import java.util.concurrent.CountDownLatch;
*/
public class RepairUnsortedFileCompactionTask extends InnerSpaceCompactionTask
{
+ private static final AtomicLong lastAllocatedFileTimestamp = new
AtomicLong(Long.MAX_VALUE / 2);
+
+ public static void recoverAllocatedFileTimestamp(long timestamp) {
+ if (timestamp > lastAllocatedFileTimestamp.get()) {
+ lastAllocatedFileTimestamp.set(timestamp);
+ }
+ }
+
private final TsFileResource sourceFile;
private final boolean rewriteFile;
private CountDownLatch latch;
@@ -136,6 +145,10 @@ public class RepairUnsortedFileCompactionTask extends
InnerSpaceCompactionTask {
calculateSourceFilesAndTargetFiles();
isHoldingWriteLock = new boolean[this.filesView.sourceFilesInLog.size()];
Arrays.fill(isHoldingWriteLock, false);
+ logFile =
+ new File(
+ filesView.targetFilesInLog.get(0).getTsFilePath()
+ + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
}
@Override
@@ -148,38 +161,28 @@ public class RepairUnsortedFileCompactionTask extends
InnerSpaceCompactionTask {
}
private File generateTargetFile() throws IOException {
- String path = sourceFile.getTsFile().getParentFile().getPath();
- // if source file is sequence, the sequence data path should be replaced
to unsequence
+ String targetFileDir = sourceFile.getTsFile().getParentFile().getPath();
+ TsFileNameGenerator.TsFileName sourceFileName =
+ TsFileNameGenerator.getTsFileName(sourceFile.getTsFile().getName());
+ String fileNameStr =
+ String.format(
+ "%d-%d-%d-%d" + IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX,
+ sourceFile.isSeq()
+ ? lastAllocatedFileTimestamp.incrementAndGet()
+ : sourceFileName.getTime(),
+ sourceFile.isSeq() ? 0 : sourceFileName.getVersion(),
+ sourceFileName.getInnerCompactionCnt() + 1,
+ sourceFileName.getCrossCompactionCnt());
+ // if source file is sequence, the sequence data targetFileDir should be
replaced to unsequence
if (sourceFile.isSeq()) {
- int pos = path.lastIndexOf("sequence");
- path = path.substring(0, pos) + "unsequence" + path.substring(pos +
"sequence".length());
+ int pos = targetFileDir.lastIndexOf("sequence");
+ targetFileDir =
+ targetFileDir.substring(0, pos)
+ + "unsequence"
+ + targetFileDir.substring(pos + "sequence".length());
}
-
- TsFileNameGenerator.TsFileName tsFileName =
- TsFileNameGenerator.getTsFileName(sourceFile.getTsFile().getName());
-
- File targetTsFile = null;
- // set version = 0 to keep unsequence data cover sequence data
- do {
- String fileNameStr =
- String.format(
- "%d-%d-%d-%d" + IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX,
- tsFileName.getTime(),
- 0,
- tsFileName.getInnerCompactionCnt() + 1,
- 0);
- targetTsFile = new File(path + File.separator + fileNameStr);
- if (!targetTsFile.getParentFile().exists()) {
- targetTsFile.getParentFile().mkdirs();
- }
- // avoid same file name
- tsFileName.setTime(tsFileName.getTime() + 1);
- // Use the log file method to determine whether there are other
concurrent
- // repair tasks using the file with the same name.
- logFile =
- new File(targetTsFile.getPath() +
CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
- } while (!logFile.createNewFile());
- return targetTsFile;
+ File targetFile = new File(targetFileDir + File.separator + fileNameStr);
+ return targetFile;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java
index 1dde62cb6a4..802e0afd140 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java
@@ -238,25 +238,28 @@ public class NewSizeTieredCompactionSelector extends
SizeTieredCompactionSelecto
// size of a single file, merge all files together and try to include
// as many files as possible within the limit.
long totalFileSize = currentSelectedFileTotalSize +
currentSkippedFileTotalSize;
+ int totalFileNum = currentSelectedResources.size() +
currentSkippedResources.size();
nextTaskStartIndex = lastSelectedFileIndex + 1;
for (TsFileResource resource : lastContinuousSkippedResources) {
long currentFileSize = resource.getTsFileSize();
if (totalFileSize + currentFileSize > singleFileSizeThreshold
+ || totalFileNum > totalFileNumUpperBound
||
!isFileLevelSatisfied(resource.getTsFileID().getInnerCompactionCount())) {
break;
}
currentSkippedResources.add(resource);
totalFileSize += currentFileSize;
currentSkippedFileTotalSize += currentFileSize;
+ totalFileNum++;
nextTaskStartIndex++;
}
- int totalFileNum = currentSelectedResources.size() +
currentSkippedResources.size();
if (totalFileNum < 2) {
return;
}
- boolean canCompactAllFiles = totalFileSize <= singleFileSizeThreshold;
+ boolean canCompactAllFiles =
+ totalFileSize <= singleFileSizeThreshold && totalFileNum <=
totalFileNumUpperBound;
if (canCompactAllFiles) {
currentSelectedResources =
Stream.concat(currentSelectedResources.stream(),
currentSkippedResources.stream())
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index 50aee584f9c..919848ee540 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -407,4 +407,18 @@ public class TsFileManager {
readUnlock();
}
}
+
+ public long getMaxFileTimestampOfUnSequenceFile() {
+ long maxFileTimestamp = -1;
+ resourceListLock.readLock().lock();
+ try {
+ for (TsFileResourceList resourceList : unsequenceFiles.values()) {
+ TsFileResource lastResource = resourceList.get(resourceList.size() -
1);
+ maxFileTimestamp = Math.max(maxFileTimestamp,
lastResource.getTimePartition());
+ }
+ } finally {
+ resourceListLock.readLock().unlock();
+ }
+ return maxFileTimestamp;
+ }
}