This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new eff4720  [To rel/0.11] delete mods after merge (#2671)
eff4720 is described below

commit eff4720d7fd24163959f9a647356bb7935a27453
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Sat Feb 20 09:31:15 2021 +0800

    [To rel/0.11] delete mods after merge (#2671)
---
 .../level/LevelCompactionTsFileManagement.java     |  39 +++-
 .../engine/compaction/utils/CompactionUtils.java   |  39 ++--
 .../db/engine/compaction/CompactionChunkTest.java  | 215 +++++++++++++++++++++
 .../engine/compaction/LevelCompactionModsTest.java |  90 +++++++++
 .../compaction/LevelCompactionRecoverTest.java     |  10 +-
 5 files changed, 366 insertions(+), 27 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index aaa6f8a..6792c68 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.engine.compaction.level;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
-import static 
org.apache.iotdb.db.engine.compaction.no.NoCompactionTsFileManagement.compareFileName;
 import static 
org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
 import static 
org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SOURCE_NAME;
 import static 
org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.TARGET_NAME;
@@ -48,6 +47,8 @@ import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -89,6 +90,31 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
     clear();
   }
 
+  public void renameLevelFilesMods(Collection<Modification> filterModification,
+      Collection<TsFileResource> mergeTsFiles,
+      TsFileResource targetTsFile) throws IOException {
+    logger.debug("{} [compaction] merge starts to rename real file's mod", 
storageGroupName);
+    List<Modification> modifications = new ArrayList<>();
+    for (TsFileResource mergeTsFile : mergeTsFiles) {
+      try (ModificationFile sourceModificationFile = new ModificationFile(
+          mergeTsFile.getTsFilePath() + ModificationFile.FILE_SUFFIX)) {
+        modifications.addAll(sourceModificationFile.getModifications());
+        if (sourceModificationFile.exists()) {
+          sourceModificationFile.remove();
+        }
+      }
+    }
+    modifications.removeAll(filterModification);
+    if (!modifications.isEmpty()) {
+      try (ModificationFile modificationFile = new ModificationFile(
+          targetTsFile.getTsFilePath() + ModificationFile.FILE_SUFFIX)) {
+        for (Modification modification : modifications) {
+          modificationFile.write(modification);
+        }
+      }
+    }
+  }
+
   private void deleteLevelFilesInDisk(Collection<TsFileResource> mergeTsFiles) 
{
     logger.debug("{} [compaction] merge starts to delete real file", 
storageGroupName);
     for (TsFileResource mergeTsFile : mergeTsFiles) {
@@ -375,9 +401,10 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
             writer.close();
             CompactionLogger compactionLogger = new 
CompactionLogger(storageGroupDir,
                 storageGroupName);
+            List<Modification> modifications = new ArrayList<>();
             CompactionUtils
                 .merge(targetTsFileResource, getTsFileList(isSeq), 
storageGroupName,
-                    compactionLogger, deviceSet, isSeq);
+                    compactionLogger, deviceSet, isSeq, modifications);
             compactionLogger.close();
           } else {
             writer.close();
@@ -403,10 +430,11 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
             writer.close();
             CompactionLogger compactionLogger = new 
CompactionLogger(storageGroupDir,
                 storageGroupName);
+            List<Modification> modifications = new ArrayList<>();
             CompactionUtils
                 .merge(targetResource, sourceTsFileResources, storageGroupName,
                     compactionLogger, deviceSet,
-                    isSeq);
+                    isSeq, modifications);
             // complete compaction and delete source file
             writeLock();
             try {
@@ -423,6 +451,7 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
               writeUnlock();
             }
             deleteLevelFilesInDisk(sourceTsFileResources);
+            renameLevelFilesMods(modifications, sourceTsFileResources, 
targetResource);
             compactionLogger.close();
           } else {
             writer.close();
@@ -557,10 +586,11 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
             }
 
             TsFileResource newResource = new TsFileResource(newLevelFile);
+            List<Modification> modifications = new ArrayList<>();
             // merge, read from source files and write to target file
             CompactionUtils
                 .merge(newResource, toMergeTsFiles, storageGroupName, 
compactionLogger,
-                    new HashSet<>(), sequence);
+                    new HashSet<>(), sequence, modifications);
             logger.info(
                 "{} [Compaction] merged level-{}'s {} TsFiles to next level, 
and start to delete old files",
                 storageGroupName, i, toMergeTsFiles.size());
@@ -579,6 +609,7 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
               writeUnlock();
             }
             deleteLevelFilesInDisk(toMergeTsFiles);
+            renameLevelFilesMods(modifications, toMergeTsFiles, newResource);
             compactionLogger.close();
             File logFile = FSFactoryProducer.getFSFactory()
                 .getFile(storageGroupDir, storageGroupName + 
COMPACTION_LOG_NAME);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index bd31319..73bfcbe 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -54,7 +54,6 @@ import 
org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,7 +70,8 @@ public class CompactionUtils {
 
   private static Pair<ChunkMetadata, Chunk> readByAppendMerge(
       Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap,
-      Map<String, List<Modification>> modificationCache, PartialPath 
seriesPath)
+      Map<String, List<Modification>> modificationCache, PartialPath 
seriesPath,
+      List<Modification> modifications)
       throws IOException {
     ChunkMetadata newChunkMetadata = null;
     Chunk newChunk = null;
@@ -79,7 +79,8 @@ public class CompactionUtils {
         .entrySet()) {
       TsFileSequenceReader reader = entry.getKey();
       List<ChunkMetadata> chunkMetadataList = entry.getValue();
-      modifyChunkMetaDataWithCache(reader, chunkMetadataList, 
modificationCache, seriesPath);
+      modifyChunkMetaDataWithCache(reader, chunkMetadataList, 
modificationCache, seriesPath,
+          modifications);
       for (ChunkMetadata chunkMetadata : chunkMetadataList) {
         Chunk chunk = reader.readMemChunk(chunkMetadata);
         if (newChunkMetadata == null) {
@@ -97,13 +98,13 @@ public class CompactionUtils {
   private static long readByDeserializeMerge(
       Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap, 
long maxVersion,
       Map<Long, TimeValuePair> timeValuePairMap, Map<String, 
List<Modification>> modificationCache,
-      PartialPath seriesPath)
-      throws IOException {
+      PartialPath seriesPath, List<Modification> modifications) throws 
IOException {
     for (Entry<TsFileSequenceReader, List<ChunkMetadata>> entry : 
readerChunkMetadataMap
         .entrySet()) {
       TsFileSequenceReader reader = entry.getKey();
       List<ChunkMetadata> chunkMetadataList = entry.getValue();
-      modifyChunkMetaDataWithCache(reader, chunkMetadataList, 
modificationCache, seriesPath);
+      modifyChunkMetaDataWithCache(reader, chunkMetadataList, 
modificationCache, seriesPath,
+          modifications);
       for (ChunkMetadata chunkMetadata : chunkMetadataList) {
         maxVersion = Math.max(chunkMetadata.getVersion(), maxVersion);
         IChunkReader chunkReader = new ChunkReaderByTimestamp(
@@ -121,14 +122,14 @@ public class CompactionUtils {
     return maxVersion;
   }
 
-  private static long writeByAppendMerge(long maxVersion, String device,
+  public static long writeByAppendMerge(long maxVersion, String device,
       RateLimiter compactionWriteRateLimiter,
       Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry,
       TsFileResource targetResource, RestorableTsFileIOWriter writer,
-      Map<String, List<Modification>> modificationCache)
+      Map<String, List<Modification>> modificationCache, List<Modification> 
modifications)
       throws IOException, IllegalPathException {
     Pair<ChunkMetadata, Chunk> chunkPair = readByAppendMerge(entry.getValue(),
-        modificationCache, new PartialPath(device, entry.getKey()));
+        modificationCache, new PartialPath(device, entry.getKey()), 
modifications);
     ChunkMetadata newChunkMetadata = chunkPair.left;
     Chunk newChunk = chunkPair.right;
     if (newChunkMetadata != null && newChunk != null) {
@@ -143,15 +144,16 @@ public class CompactionUtils {
     return maxVersion;
   }
 
-  private static long writeByDeserializeMerge(long maxVersion, String device,
+  public static long writeByDeserializeMerge(long maxVersion, String device,
       RateLimiter compactionRateLimiter,
       Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry,
       TsFileResource targetResource, RestorableTsFileIOWriter writer,
-      Map<String, List<Modification>> modificationCache) throws IOException, 
IllegalPathException {
+      Map<String, List<Modification>> modificationCache, List<Modification> 
modifications)
+      throws IOException, IllegalPathException {
     Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>();
     Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap = 
entry.getValue();
     maxVersion = readByDeserializeMerge(readerChunkMetadataMap, maxVersion, 
timeValuePairMap,
-        modificationCache, new PartialPath(device, entry.getKey()));
+        modificationCache, new PartialPath(device, entry.getKey()), 
modifications);
     boolean isChunkMetadataEmpty = true;
     for (List<ChunkMetadata> chunkMetadataList : 
readerChunkMetadataMap.values()) {
       if (!chunkMetadataList.isEmpty()) {
@@ -210,7 +212,8 @@ public class CompactionUtils {
   public static void merge(TsFileResource targetResource,
       List<TsFileResource> tsFileResources, String storageGroup,
       CompactionLogger compactionLogger,
-      Set<String> devices, boolean sequence) throws IOException, 
IllegalPathException {
+      Set<String> devices, boolean sequence, List<Modification> modifications)
+      throws IOException, IllegalPathException {
     RestorableTsFileIOWriter writer = new 
RestorableTsFileIOWriter(targetResource.getTsFile());
     Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new 
HashMap<>();
     Map<String, List<Modification>> modificationCache = new HashMap<>();
@@ -259,8 +262,7 @@ public class CompactionUtils {
         for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> 
entry : measurementChunkMetadataMap
             .entrySet()) {
           maxVersion = writeByDeserializeMerge(maxVersion, device, 
compactionWriteRateLimiter,
-              entry,
-              targetResource, writer, modificationCache);
+              entry, targetResource, writer, modificationCache, modifications);
         }
         writer.endChunkGroup();
         writer.writeVersion(maxVersion);
@@ -282,13 +284,13 @@ public class CompactionUtils {
             logger.debug("{} [Compaction] page enough large, use append 
merge", storageGroup);
             // append page in chunks, so we do not have to deserialize a chunk
             maxVersion = writeByAppendMerge(maxVersion, device, 
compactionWriteRateLimiter,
-                entry, targetResource, writer, modificationCache);
+                entry, targetResource, writer, modificationCache, 
modifications);
           } else {
             logger
                 .debug("{} [Compaction] page too small, use deserialize 
merge", storageGroup);
             // we have to deserialize chunks to merge pages
             maxVersion = writeByDeserializeMerge(maxVersion, device, 
compactionWriteRateLimiter,
-                entry, targetResource, writer, modificationCache);
+                entry, targetResource, writer, modificationCache, 
modifications);
           }
         }
         writer.endChunkGroup();
@@ -334,7 +336,7 @@ public class CompactionUtils {
 
   private static void modifyChunkMetaDataWithCache(TsFileSequenceReader reader,
       List<ChunkMetadata> chunkMetadataList, Map<String, List<Modification>> 
modificationCache,
-      PartialPath seriesPath) {
+      PartialPath seriesPath, List<Modification> usedModifications) {
     List<Modification> modifications =
         modificationCache.computeIfAbsent(reader.getFileName(),
             fileName -> new LinkedList<>(
@@ -344,6 +346,7 @@ public class CompactionUtils {
     for (Modification modification : modifications) {
       if (modification.getPath().matchFullPath(seriesPath)) {
         seriesModifications.add(modification);
+        usedModifications.add(modification);
       }
     }
     modifyChunkMetaData(chunkMetadataList, seriesModifications);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
new file mode 100644
index 0000000..efb24f8
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.util.concurrent.RateLimiter;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
+import org.apache.iotdb.db.engine.merge.manage.MergeManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.IChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CompactionChunkTest extends LevelCompactionTest {
+
+  File tempSGDir;
+
+  @Before
+  public void setUp() throws IOException, WriteProcessException, 
MetadataException {
+    super.setUp();
+    tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
+    tempSGDir.mkdirs();
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    super.tearDown();
+    FileUtils.deleteDirectory(tempSGDir);
+  }
+
+  @Test
+  public void testAppendMerge() throws IOException, IllegalPathException {
+    Map<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> 
measurementChunkMetadataMap = new HashMap<>();
+    List<TsFileResource> sourceTsfileResources = seqResources.subList(0, 2);
+    File file = new File(TestConstant.BASE_OUTPUT_PATH
+        .concat(0 + IoTDBConstant.FILE_NAME_SEPARATOR + 0 + 
IoTDBConstant.FILE_NAME_SEPARATOR + 1
+            + ".tsfile"));
+    TsFileResource targetTsfileResource = new TsFileResource(file);
+    RateLimiter compactionWriteRateLimiter = 
MergeManager.getINSTANCE().getMergeWriteRateLimiter();
+    String device = COMPACTION_TEST_SG + PATH_SEPARATOR + "device0";
+    RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(
+        targetTsfileResource.getTsFile());
+    writer.startChunkGroup(device);
+    for (TsFileResource tsFileResource : sourceTsfileResources) {
+      TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFileResource.getTsFilePath());
+      Map<String, List<ChunkMetadata>> chunkMetadataMap = 
reader.readChunkMetadataInDevice(device);
+      for (Entry<String, List<ChunkMetadata>> entry : 
chunkMetadataMap.entrySet()) {
+        for (ChunkMetadata chunkMetadata : entry.getValue()) {
+          Map<TsFileSequenceReader, List<ChunkMetadata>> 
readerChunkMetadataMap;
+          String measurementUid = chunkMetadata.getMeasurementUid();
+          if (measurementChunkMetadataMap.containsKey(measurementUid)) {
+            readerChunkMetadataMap = 
measurementChunkMetadataMap.get(measurementUid);
+          } else {
+            readerChunkMetadataMap = new LinkedHashMap<>();
+          }
+          List<ChunkMetadata> chunkMetadataList;
+          if (readerChunkMetadataMap.containsKey(reader)) {
+            chunkMetadataList = readerChunkMetadataMap.get(reader);
+          } else {
+            chunkMetadataList = new ArrayList<>();
+          }
+          chunkMetadataList.add(chunkMetadata);
+          readerChunkMetadataMap.put(reader, chunkMetadataList);
+          measurementChunkMetadataMap
+              .put(chunkMetadata.getMeasurementUid(), readerChunkMetadataMap);
+        }
+      }
+      for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry 
: measurementChunkMetadataMap
+          .entrySet()) {
+        CompactionUtils
+            .writeByAppendMerge(0, device, compactionWriteRateLimiter, entry, 
targetTsfileResource,
+                writer, new HashMap<>(), new ArrayList<>());
+      }
+      reader.close();
+    }
+    writer.endChunkGroup();
+    targetTsfileResource.serialize();
+    writer.endFile();
+    targetTsfileResource.close();
+
+    TsFileSequenceReader reader = new TsFileSequenceReader(file.getPath());
+    List<Path> paths = reader.getAllPaths();
+    for (Path path : paths) {
+      List<ChunkMetadata> chunkMetadataList = 
reader.getChunkMetadataList(path);
+      for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+        Chunk chunk = reader.readMemChunk(chunkMetadata);
+        IChunkReader chunkReader = new ChunkReaderByTimestamp(chunk);
+        long totalPointCount = 0;
+        while (chunkReader.hasNextSatisfiedPage()) {
+          BatchData batchData = chunkReader.nextPageData();
+          for (int i = 0; i < batchData.length(); i++) {
+            assertEquals(batchData.getTimeByIndex(i), 
batchData.getDoubleByIndex(i), 0.001);
+          }
+          totalPointCount += batchData.length();
+        }
+        assertEquals(totalPointCount, chunkMetadata.getNumOfPoints());
+      }
+    }
+    reader.close();
+  }
+
+  @Test
+  public void testDeserializeMerge() throws IOException, IllegalPathException {
+    Map<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> 
measurementChunkMetadataMap = new HashMap<>();
+    List<TsFileResource> sourceTsfileResources = seqResources.subList(0, 2);
+    File file = new File(TestConstant.BASE_OUTPUT_PATH
+        .concat(0 + IoTDBConstant.FILE_NAME_SEPARATOR + 0 + 
IoTDBConstant.FILE_NAME_SEPARATOR + 1
+            + ".tsfile"));
+    TsFileResource targetTsfileResource = new TsFileResource(file);
+    RateLimiter compactionWriteRateLimiter = 
MergeManager.getINSTANCE().getMergeWriteRateLimiter();
+    String device = COMPACTION_TEST_SG + PATH_SEPARATOR + "device0";
+    RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(
+        targetTsfileResource.getTsFile());
+    writer.startChunkGroup(device);
+    for (TsFileResource tsFileResource : sourceTsfileResources) {
+      TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFileResource.getTsFilePath());
+      Map<String, List<ChunkMetadata>> chunkMetadataMap = 
reader.readChunkMetadataInDevice(device);
+      for (Entry<String, List<ChunkMetadata>> entry : 
chunkMetadataMap.entrySet()) {
+        for (ChunkMetadata chunkMetadata : entry.getValue()) {
+          Map<TsFileSequenceReader, List<ChunkMetadata>> 
readerChunkMetadataMap;
+          String measurementUid = chunkMetadata.getMeasurementUid();
+          if (measurementChunkMetadataMap.containsKey(measurementUid)) {
+            readerChunkMetadataMap = 
measurementChunkMetadataMap.get(measurementUid);
+          } else {
+            readerChunkMetadataMap = new LinkedHashMap<>();
+          }
+          List<ChunkMetadata> chunkMetadataList;
+          if (readerChunkMetadataMap.containsKey(reader)) {
+            chunkMetadataList = readerChunkMetadataMap.get(reader);
+          } else {
+            chunkMetadataList = new ArrayList<>();
+          }
+          chunkMetadataList.add(chunkMetadata);
+          readerChunkMetadataMap.put(reader, chunkMetadataList);
+          measurementChunkMetadataMap
+              .put(chunkMetadata.getMeasurementUid(), readerChunkMetadataMap);
+        }
+      }
+      for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry 
: measurementChunkMetadataMap
+          .entrySet()) {
+        CompactionUtils
+            .writeByDeserializeMerge(0, device, compactionWriteRateLimiter, 
entry,
+                targetTsfileResource,
+                writer, new HashMap<>(), new ArrayList<>());
+      }
+      reader.close();
+    }
+    writer.endChunkGroup();
+    targetTsfileResource.serialize();
+    writer.endFile();
+    targetTsfileResource.close();
+
+    TsFileSequenceReader reader = new TsFileSequenceReader(file.getPath());
+    List<Path> paths = reader.getAllPaths();
+    for (Path path : paths) {
+      List<ChunkMetadata> chunkMetadataList = 
reader.getChunkMetadataList(path);
+      for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+        Chunk chunk = reader.readMemChunk(chunkMetadata);
+        IChunkReader chunkReader = new ChunkReaderByTimestamp(chunk);
+        long totalPointCount = 0;
+        while (chunkReader.hasNextSatisfiedPage()) {
+          BatchData batchData = chunkReader.nextPageData();
+          for (int i = 0; i < batchData.length(); i++) {
+            assertEquals(batchData.getTimeByIndex(i), 
batchData.getDoubleByIndex(i), 0.001);
+          }
+          totalPointCount += batchData.length();
+        }
+        assertEquals(totalPointCount, chunkMetadata.getNumOfPoints());
+      }
+    }
+    reader.close();
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java
new file mode 100644
index 0000000..462759c
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.constant.TestConstant;
+import 
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LevelCompactionModsTest extends LevelCompactionTest {
+
+  File tempSGDir;
+
+  @Before
+  public void setUp() throws IOException, WriteProcessException, 
MetadataException {
+    super.setUp();
+    tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
+    tempSGDir.mkdirs();
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    super.tearDown();
+    FileUtils.deleteDirectory(tempSGDir);
+  }
+
+  @Test
+  public void testCompactionMods() throws IllegalPathException, IOException {
+    LevelCompactionTsFileManagement levelCompactionTsFileManagement = new 
LevelCompactionTsFileManagement(
+        COMPACTION_TEST_SG, tempSGDir.getPath());
+    TsFileResource sourceTsFileResource = seqResources.get(0);
+    TsFileResource targetTsFileResource = seqResources.get(1);
+    List<Modification> filterModifications = new ArrayList<>();
+    Modification modification1;
+    Modification modification2;
+    try (ModificationFile sourceModificationFile = new ModificationFile(
+        sourceTsFileResource.getTsFilePath() + ModificationFile.FILE_SUFFIX)) {
+      modification1 = new Deletion(new PartialPath(deviceIds[0], "sensor0"), 
0, 0);
+      modification2 = new Deletion(new PartialPath(deviceIds[0], "sensor1"), 
0, 0);
+      sourceModificationFile.write(modification1);
+      sourceModificationFile.write(modification2);
+      filterModifications.add(modification1);
+    }
+    List<TsFileResource> sourceTsFileResources = new ArrayList<>();
+    sourceTsFileResources.add(sourceTsFileResource);
+    levelCompactionTsFileManagement
+        .renameLevelFilesMods(filterModifications, sourceTsFileResources, 
targetTsFileResource);
+    try (ModificationFile targetModificationFile = new ModificationFile(
+        targetTsFileResource.getTsFilePath() + ModificationFile.FILE_SUFFIX)) {
+      Collection<Modification> modifications = 
targetModificationFile.getModifications();
+      assertEquals(1, modifications.size());
+      assertEquals(modification2, modifications.stream().findFirst().get());
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index b401c03..d8a37a1 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -111,7 +111,7 @@ public class LevelCompactionRecoverTest extends 
LevelCompactionTest {
                 + ".tsfile")));
     compactionLogger.logFile(TARGET_NAME, targetTsFileResource.getTsFile());
     CompactionUtils.merge(targetTsFileResource, new 
ArrayList<>(seqResources.subList(0, 3)),
-        COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true);
+        COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true, new 
ArrayList<>());
     compactionLogger.close();
     levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
     levelCompactionTsFileManagement.recover();
@@ -170,7 +170,7 @@ public class LevelCompactionRecoverTest extends 
LevelCompactionTest {
                 + ".tsfile")));
     compactionLogger.logFile(TARGET_NAME, targetTsFileResource.getTsFile());
     CompactionUtils.merge(targetTsFileResource, new 
ArrayList<>(seqResources.subList(0, 3)),
-        COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true);
+        COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true, new 
ArrayList<>());
     compactionLogger.close();
 
     BufferedReader logReader = new BufferedReader(
@@ -248,7 +248,7 @@ public class LevelCompactionRecoverTest extends 
LevelCompactionTest {
                 + ".tsfile")));
     compactionLogger.logFile(TARGET_NAME, targetTsFileResource.getTsFile());
     CompactionUtils.merge(targetTsFileResource, new 
ArrayList<>(seqResources.subList(0, 3)),
-        COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true);
+        COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true, new 
ArrayList<>());
     compactionLogger.close();
 
     BufferedReader logReader = new BufferedReader(
@@ -331,7 +331,7 @@ public class LevelCompactionRecoverTest extends 
LevelCompactionTest {
                 + ".tsfile")));
     compactionLogger.logFile(TARGET_NAME, targetTsFileResource.getTsFile());
     CompactionUtils.merge(targetTsFileResource, new 
ArrayList<>(seqResources.subList(0, 3)),
-        COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), false);
+        COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), false, new 
ArrayList<>());
     compactionLogger.close();
     levelCompactionTsFileManagement.addRecover(targetTsFileResource, false);
     levelCompactionTsFileManagement.recover();
@@ -483,7 +483,7 @@ public class LevelCompactionRecoverTest extends 
LevelCompactionTest {
                 + ".tsfile")));
     compactionLogger.logFile(TARGET_NAME, targetTsFileResource.getTsFile());
     CompactionUtils.merge(targetTsFileResource, new 
ArrayList<>(seqResources.subList(0, 3)),
-        COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true);
+        COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true, new 
ArrayList<>());
     levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
     compactionLogger.close();
     levelCompactionTsFileManagement.recover();

Reply via email to