This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a0e30f5157 [IOTDB-3745]Deduplicate mods of target files in compaction 
(#6591)
a0e30f5157 is described below

commit a0e30f515777bf76b37b0b86e6f24910bb3f35c1
Author: 周沛辰 <[email protected]>
AuthorDate: Thu Jul 7 08:48:42 2022 +0800

    [IOTDB-3745]Deduplicate mods of target files in compaction (#6591)
---
 .../db/engine/compaction/CompactionUtils.java      | 49 ++++++------
 .../engine/storagegroup/TsFileNameGenerator.java   | 23 ++++++
 .../cross/RewriteCrossSpaceCompactionTest.java     |  8 +-
 .../compaction/inner/InnerSeqCompactionTest.java   | 90 ++++++++++++++++++++++
 4 files changed, 139 insertions(+), 31 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 096f03cbf4..fb143e1e6f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -39,11 +39,12 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * This tool can be used to perform inner space or cross space compaction of 
aligned and non aligned
@@ -115,9 +116,19 @@ public class CompactionUtils {
           tsFileResource);
     }
     // update each target mods file.
-    for (TsFileResource tsFileResource : targetResources) {
-      updateOneTargetMods(
-          tsFileResource, 
seqFileInfoMap.get(tsFileResource.getTsFile().getName()), unseqResources);
+    for (TsFileResource targetResource : targetResources) {
+      TsFileResource seqFile = 
seqFileInfoMap.get(targetResource.getTsFile().getName());
+      Set<Modification> modifications = new HashSet<>();
+      if (seqFile != null) {
+        // get compaction mods from its corresponding source seq file
+        
modifications.addAll(ModificationFile.getCompactionMods(seqFile).getModifications());
+      }
+      // get compaction mods from all source unseq files
+      for (TsFileResource unseqFile : unseqResources) {
+        
modifications.addAll(ModificationFile.getCompactionMods(unseqFile).getModifications());
+      }
+
+      updateOneTargetMods(targetResource, modifications);
     }
   }
 
@@ -127,15 +138,20 @@ public class CompactionUtils {
    */
   public static void combineModsInInnerCompaction(
       Collection<TsFileResource> sourceFiles, TsFileResource targetTsFile) 
throws IOException {
-    List<Modification> modifications = new ArrayList<>();
+    Set<Modification> modifications = new HashSet<>();
     for (TsFileResource mergeTsFile : sourceFiles) {
       try (ModificationFile sourceCompactionModificationFile =
           ModificationFile.getCompactionMods(mergeTsFile)) {
         
modifications.addAll(sourceCompactionModificationFile.getModifications());
       }
     }
+    updateOneTargetMods(targetTsFile, modifications);
+  }
+
+  private static void updateOneTargetMods(
+      TsFileResource targetFile, Set<Modification> modifications) throws 
IOException {
     if (!modifications.isEmpty()) {
-      try (ModificationFile modificationFile = 
ModificationFile.getNormalMods(targetTsFile)) {
+      try (ModificationFile modificationFile = 
ModificationFile.getNormalMods(targetFile)) {
         for (Modification modification : modifications) {
           // we have to set modification offset to MAX_VALUE, as the offset of 
source chunk may
           // change after compaction
@@ -146,27 +162,6 @@ public class CompactionUtils {
     }
   }
 
-  private static void updateOneTargetMods(
-      TsFileResource targetFile, TsFileResource seqFile, List<TsFileResource> 
unseqFiles)
-      throws IOException {
-    // write mods in the seq file
-    if (seqFile != null) {
-      ModificationFile seqCompactionModificationFile = 
ModificationFile.getCompactionMods(seqFile);
-      for (Modification modification : 
seqCompactionModificationFile.getModifications()) {
-        targetFile.getModFile().write(modification);
-      }
-    }
-    // write mods in all unseq files
-    for (TsFileResource unseqFile : unseqFiles) {
-      ModificationFile compactionUnseqModificationFile =
-          ModificationFile.getCompactionMods(unseqFile);
-      for (Modification modification : 
compactionUnseqModificationFile.getModifications()) {
-        targetFile.getModFile().write(modification);
-      }
-    }
-    targetFile.getModFile().close();
-  }
-
   public static void deleteCompactionModsFile(
       List<TsFileResource> selectedSeqTsFileResourceList,
       List<TsFileResource> selectedUnSeqTsFileResourceList)
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
index 946d827461..8c15c99ed6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.engine.storagegroup;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -122,6 +123,7 @@ public class TsFileNameGenerator {
     }
   }
 
+  @TestOnly
   public static TsFileResource increaseCrossCompactionCnt(TsFileResource 
tsFileResource)
       throws IOException {
     File tsFile = tsFileResource.getTsFile();
@@ -142,6 +144,27 @@ public class TsFileNameGenerator {
     return tsFileResource;
   }
 
+  @TestOnly
+  public static TsFileResource increaseInnerCompactionCnt(TsFileResource 
tsFileResource)
+      throws IOException {
+    File tsFile = tsFileResource.getTsFile();
+    String path = tsFile.getParent();
+    TsFileName tsFileName = 
getTsFileName(tsFileResource.getTsFile().getName());
+    tsFileName.setInnerCompactionCnt(tsFileName.getInnerCompactionCnt() + 1);
+    tsFileResource.setFile(
+        new File(
+            path,
+            tsFileName.time
+                + FILE_NAME_SEPARATOR
+                + tsFileName.version
+                + FILE_NAME_SEPARATOR
+                + tsFileName.innerCompactionCnt
+                + FILE_NAME_SEPARATOR
+                + tsFileName.crossCompactionCnt
+                + TSFILE_SUFFIX));
+    return tsFileResource;
+  }
+
   public static File increaseCrossCompactionCnt(File tsFile) throws 
IOException {
     String path = tsFile.getParent();
     TsFileName tsFileName = getTsFileName(tsFile.getName());
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
index fa3181d854..b7d5f709d4 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
@@ -246,7 +246,7 @@ public class RewriteCrossSpaceCompactionTest extends 
AbstractCompactionTest {
                   .replace(CROSS_COMPACTION_TMP_FILE_SUFFIX, 
TsFileConstant.TSFILE_SUFFIX)));
       resource.resetModFile();
       Assert.assertTrue(resource.getModFile().exists());
-      Assert.assertEquals(24, resource.getModFile().getModifications().size());
+      Assert.assertEquals(4, resource.getModFile().getModifications().size());
     }
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
     for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -483,7 +483,7 @@ public class RewriteCrossSpaceCompactionTest extends 
AbstractCompactionTest {
         continue;
       }
       Assert.assertTrue(resource.getModFile().exists());
-      Assert.assertEquals(180, 
resource.getModFile().getModifications().size());
+      Assert.assertEquals(30, resource.getModFile().getModifications().size());
     }
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
 
@@ -662,7 +662,7 @@ public class RewriteCrossSpaceCompactionTest extends 
AbstractCompactionTest {
           new TsFileResource(
               
TsFileNameGenerator.increaseCrossCompactionCnt(seqResource.getTsFile()));
       Assert.assertTrue(resource.getModFile().exists());
-      Assert.assertEquals(6, resource.getModFile().getModifications().size());
+      Assert.assertEquals(1, resource.getModFile().getModifications().size());
       Assert.assertFalse(resource.getCompactionModFile().exists());
     }
   }
@@ -784,7 +784,7 @@ public class RewriteCrossSpaceCompactionTest extends 
AbstractCompactionTest {
           new TsFileResource(
               
TsFileNameGenerator.increaseCrossCompactionCnt(seqResource.getTsFile()));
       Assert.assertTrue(resource.getModFile().exists());
-      Assert.assertEquals(12, resource.getModFile().getModifications().size());
+      Assert.assertEquals(2, resource.getModFile().getModifications().size());
       Assert.assertFalse(resource.getCompactionModFile().exists());
     }
   }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
index 8c52951785..09e17a763f 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.compaction.CompactionUtils;
@@ -34,7 +35,11 @@ import 
org.apache.iotdb.db.engine.compaction.utils.CompactionClearUtils;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
 import 
org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionTimeseriesType;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -46,6 +51,7 @@ import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -57,6 +63,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils.putChunk;
 import static 
org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils.putOnePageChunk;
@@ -91,6 +98,7 @@ public class InnerSeqCompactionTest {
   public void setUp() throws MetadataException {
     prevMaxDegreeOfIndexNode = 
TSFileDescriptor.getInstance().getConfig().getMaxDegreeOfIndexNode();
     TSFileDescriptor.getInstance().getConfig().setMaxDegreeOfIndexNode(2);
+    EnvironmentUtils.envSetUp();
     IoTDB.configManager.init();
     IoTDB.schemaProcessor.setStorageGroup(new PartialPath(COMPACTION_TEST_SG));
     for (String fullPath : fullPaths) {
@@ -111,6 +119,7 @@ public class InnerSeqCompactionTest {
     ChunkCache.getInstance().clear();
     TimeSeriesMetadataCache.getInstance().clear();
     IoTDB.configManager.clear();
+    EnvironmentUtils.cleanEnv();
     EnvironmentUtils.cleanAllDir();
     
TSFileDescriptor.getInstance().getConfig().setMaxDegreeOfIndexNode(prevMaxDegreeOfIndexNode);
   }
@@ -967,4 +976,85 @@ public class InnerSeqCompactionTest {
       
IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(prevTargetChunkSize);
     }
   }
+
+  @Test
+  public void testCompactionWithDeletionsDuringCompactions()
+      throws MetadataException, IOException, DataRegionException {
+    // create source seq files
+    List<TsFileResource> sourceResources = new ArrayList<>();
+    List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+    List<Long> pagePointsNum = new ArrayList<>();
+    pagePointsNum.add(100L);
+    chunkPagePointsNum.add(pagePointsNum);
+    pagePointsNum = new ArrayList<>();
+    pagePointsNum.add(200L);
+    chunkPagePointsNum.add(pagePointsNum);
+    pagePointsNum = new ArrayList<>();
+    pagePointsNum.add(300L);
+    chunkPagePointsNum.add(pagePointsNum);
+    Set<String> paths = new HashSet<>();
+    for (int i = 0; i < fullPaths.length; i++) {
+      paths.add(fullPaths[i]);
+    }
+
+    for (int i = 0; i < 5; i++) {
+      TsFileResource tsFileResource =
+          CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1);
+      CompactionFileGeneratorUtils.writeTsFile(paths, chunkPagePointsNum, i * 
600L, tsFileResource);
+      sourceResources.add(tsFileResource);
+    }
+    DataRegion vsgp =
+        new DataRegion(
+            TestConstant.BASE_OUTPUT_PATH,
+            "0",
+            new TsFileFlushPolicy.DirectFlushPolicy(),
+            COMPACTION_TEST_SG);
+    vsgp.getTsFileResourceManager().addAll(sourceResources, true);
+    // delete data before compaction
+    vsgp.delete(new PartialPath(fullPaths[0]), 0, 1000, 0, null);
+
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0,
+            vsgp.getTsFileResourceManager(),
+            sourceResources,
+            true,
+            new ReadChunkCompactionPerformer(),
+            new AtomicInteger(0),
+            0);
+    task.setSourceFilesToCompactionCandidate();
+    task.checkValidAndSetMerging();
+    // delete data during compaction
+    vsgp.delete(new PartialPath(fullPaths[0]), 0, 1200, 0, null);
+    vsgp.delete(new PartialPath(fullPaths[0]), 0, 1800, 0, null);
+    for (int i = 0; i < sourceResources.size() - 1; i++) {
+      TsFileResource resource = sourceResources.get(i);
+      resource.resetModFile();
+      Assert.assertTrue(resource.getCompactionModFile().exists());
+      Assert.assertTrue(resource.getModFile().exists());
+      if (i < 2) {
+        Assert.assertEquals(3, 
resource.getModFile().getModifications().size());
+        Assert.assertEquals(2, 
resource.getCompactionModFile().getModifications().size());
+      } else if (i < 3) {
+        Assert.assertEquals(2, 
resource.getModFile().getModifications().size());
+        Assert.assertEquals(2, 
resource.getCompactionModFile().getModifications().size());
+      } else {
+        Assert.assertEquals(1, 
resource.getModFile().getModifications().size());
+        Assert.assertEquals(1, 
resource.getCompactionModFile().getModifications().size());
+      }
+    }
+    task.start();
+    for (TsFileResource resource : sourceResources) {
+      Assert.assertFalse(resource.getTsFile().exists());
+      Assert.assertFalse(resource.getModFile().exists());
+      Assert.assertFalse(resource.getCompactionModFile().exists());
+    }
+
+    TsFileResource resource =
+        TsFileNameGenerator.increaseInnerCompactionCnt(sourceResources.get(0));
+    resource.resetModFile();
+    Assert.assertTrue(resource.getModFile().exists());
+    Assert.assertEquals(2, resource.getModFile().getModifications().size());
+    Assert.assertFalse(resource.getCompactionModFile().exists());
+  }
 }

Reply via email to