This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 2f648680af4 [to rc/1.3.3] Fix the allocation of target file name in 
repair data task (#13492)
2f648680af4 is described below

commit 2f648680af43a3211c38fbeba2963b5d6fef91f9
Author: shuwenwei <[email protected]>
AuthorDate: Fri Sep 13 09:56:27 2024 +0800

    [to rc/1.3.3] Fix the allocation of target file name in repair data task 
(#13492)
    
    * fix repair task target file name allocation
    
    * use constant
    
    * fix size tiered compaction selector
    
    * Revert "use constant"
    
    This reverts commit fb646c2c7c5f195bb257e5092b69db8130f5f8d8.
    
    * Revert "fix repair task target file name allocation"
    
    This reverts commit 15e2c35a22230eae2f2c5682676a506820037608.
    
    * allocate file timestamp
---
 .../db/storageengine/dataregion/DataRegion.java    |  3 ++
 .../task/RepairUnsortedFileCompactionTask.java     | 63 +++++++++++-----------
 .../impl/NewSizeTieredCompactionSelector.java      |  7 ++-
 .../dataregion/tsfile/TsFileManager.java           | 14 +++++
 4 files changed, 55 insertions(+), 32 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 7d02ad4eb0a..0ecf7f7c81c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -70,6 +70,7 @@ import 
org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.RepairUnsortedFileCompactionTask;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
@@ -681,6 +682,8 @@ public class DataRegion implements IDataRegionForQuery {
         && !config.isEnableCrossSpaceCompaction()) {
       return;
     }
+    RepairUnsortedFileCompactionTask.recoverAllocatedFileTimestamp(
+        tsFileManager.getMaxFileTimestampOfUnSequenceFile());
     CompactionScheduleTaskManager.getInstance().registerDataRegion(this);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
index e4a2c568682..ccc02e9f5ed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
@@ -38,6 +38,7 @@ import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Repair the internal unsorted file by compaction and move it to unSequence 
space after compaction
@@ -45,6 +46,14 @@ import java.util.concurrent.CountDownLatch;
  */
 public class RepairUnsortedFileCompactionTask extends InnerSpaceCompactionTask 
{
 
+  private static final AtomicLong lastAllocatedFileTimestamp = new 
AtomicLong(Long.MAX_VALUE / 2);
+
+  public static void recoverAllocatedFileTimestamp(long timestamp) {
+    if (timestamp > lastAllocatedFileTimestamp.get()) {
+      lastAllocatedFileTimestamp.set(timestamp);
+    }
+  }
+
   private final TsFileResource sourceFile;
   private final boolean rewriteFile;
   private CountDownLatch latch;
@@ -136,6 +145,10 @@ public class RepairUnsortedFileCompactionTask extends 
InnerSpaceCompactionTask {
     calculateSourceFilesAndTargetFiles();
     isHoldingWriteLock = new boolean[this.filesView.sourceFilesInLog.size()];
     Arrays.fill(isHoldingWriteLock, false);
+    logFile =
+        new File(
+            filesView.targetFilesInLog.get(0).getTsFilePath()
+                + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
   }
 
   @Override
@@ -148,38 +161,28 @@ public class RepairUnsortedFileCompactionTask extends 
InnerSpaceCompactionTask {
   }
 
   private File generateTargetFile() throws IOException {
-    String path = sourceFile.getTsFile().getParentFile().getPath();
-    // if source file is sequence, the sequence data path should be replaced 
to unsequence
+    String targetFileDir = sourceFile.getTsFile().getParentFile().getPath();
+    TsFileNameGenerator.TsFileName sourceFileName =
+        TsFileNameGenerator.getTsFileName(sourceFile.getTsFile().getName());
+    String fileNameStr =
+        String.format(
+            "%d-%d-%d-%d" + IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX,
+            sourceFile.isSeq()
+                ? lastAllocatedFileTimestamp.incrementAndGet()
+                : sourceFileName.getTime(),
+            sourceFile.isSeq() ? 0 : sourceFileName.getVersion(),
+            sourceFileName.getInnerCompactionCnt() + 1,
+            sourceFileName.getCrossCompactionCnt());
+    // if source file is sequence, the sequence data targetFileDir should be 
replaced to unsequence
     if (sourceFile.isSeq()) {
-      int pos = path.lastIndexOf("sequence");
-      path = path.substring(0, pos) + "unsequence" + path.substring(pos + 
"sequence".length());
+      int pos = targetFileDir.lastIndexOf("sequence");
+      targetFileDir =
+          targetFileDir.substring(0, pos)
+              + "unsequence"
+              + targetFileDir.substring(pos + "sequence".length());
     }
-
-    TsFileNameGenerator.TsFileName tsFileName =
-        TsFileNameGenerator.getTsFileName(sourceFile.getTsFile().getName());
-
-    File targetTsFile = null;
-    // set version = 0 to keep unsequence data cover sequence data
-    do {
-      String fileNameStr =
-          String.format(
-              "%d-%d-%d-%d" + IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX,
-              tsFileName.getTime(),
-              0,
-              tsFileName.getInnerCompactionCnt() + 1,
-              0);
-      targetTsFile = new File(path + File.separator + fileNameStr);
-      if (!targetTsFile.getParentFile().exists()) {
-        targetTsFile.getParentFile().mkdirs();
-      }
-      // avoid same file name
-      tsFileName.setTime(tsFileName.getTime() + 1);
-      // Use the log file method to determine whether there are other 
concurrent
-      // repair tasks using the file with the same name.
-      logFile =
-          new File(targetTsFile.getPath() + 
CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
-    } while (!logFile.createNewFile());
-    return targetTsFile;
+    File targetFile = new File(targetFileDir + File.separator + fileNameStr);
+    return targetFile;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java
index 1dde62cb6a4..802e0afd140 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java
@@ -238,25 +238,28 @@ public class NewSizeTieredCompactionSelector extends 
SizeTieredCompactionSelecto
         // size of a single file, merge all files together and try to include
         // as many files as possible within the limit.
         long totalFileSize = currentSelectedFileTotalSize + 
currentSkippedFileTotalSize;
+        int totalFileNum = currentSelectedResources.size() + 
currentSkippedResources.size();
         nextTaskStartIndex = lastSelectedFileIndex + 1;
         for (TsFileResource resource : lastContinuousSkippedResources) {
           long currentFileSize = resource.getTsFileSize();
           if (totalFileSize + currentFileSize > singleFileSizeThreshold
+              || totalFileNum > totalFileNumUpperBound
               || 
!isFileLevelSatisfied(resource.getTsFileID().getInnerCompactionCount())) {
             break;
           }
           currentSkippedResources.add(resource);
           totalFileSize += currentFileSize;
           currentSkippedFileTotalSize += currentFileSize;
+          totalFileNum++;
           nextTaskStartIndex++;
         }
 
-        int totalFileNum = currentSelectedResources.size() + 
currentSkippedResources.size();
         if (totalFileNum < 2) {
           return;
         }
 
-        boolean canCompactAllFiles = totalFileSize <= singleFileSizeThreshold;
+        boolean canCompactAllFiles =
+            totalFileSize <= singleFileSizeThreshold && totalFileNum <= 
totalFileNumUpperBound;
         if (canCompactAllFiles) {
           currentSelectedResources =
               Stream.concat(currentSelectedResources.stream(), 
currentSkippedResources.stream())
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index 50aee584f9c..919848ee540 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -407,4 +407,18 @@ public class TsFileManager {
       readUnlock();
     }
   }
+
+  public long getMaxFileTimestampOfUnSequenceFile() {
+    long maxFileTimestamp = -1;
+    resourceListLock.readLock().lock();
+    try {
+      for (TsFileResourceList resourceList : unsequenceFiles.values()) {
+        TsFileResource lastResource = resourceList.get(resourceList.size() - 
1);
+        maxFileTimestamp = Math.max(maxFileTimestamp, 
lastResource.getTimePartition());
+      }
+    } finally {
+      resourceListLock.readLock().unlock();
+    }
+    return maxFileTimestamp;
+  }
 }

Reply via email to