This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new eff4720 [To rel/0.11] delete mods after merge (#2671)
eff4720 is described below
commit eff4720d7fd24163959f9a647356bb7935a27453
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Sat Feb 20 09:31:15 2021 +0800
[To rel/0.11] delete mods after merge (#2671)
---
.../level/LevelCompactionTsFileManagement.java | 39 +++-
.../engine/compaction/utils/CompactionUtils.java | 39 ++--
.../db/engine/compaction/CompactionChunkTest.java | 215 +++++++++++++++++++++
.../engine/compaction/LevelCompactionModsTest.java | 90 +++++++++
.../compaction/LevelCompactionRecoverTest.java | 10 +-
5 files changed, 366 insertions(+), 27 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index aaa6f8a..6792c68 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.engine.compaction.level;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
-import static
org.apache.iotdb.db.engine.compaction.no.NoCompactionTsFileManagement.compareFileName;
import static
org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
import static
org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SOURCE_NAME;
import static
org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.TARGET_NAME;
@@ -48,6 +47,8 @@ import org.apache.iotdb.db.engine.compaction.TsFileManagement;
import org.apache.iotdb.db.engine.compaction.utils.CompactionLogAnalyzer;
import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -89,6 +90,31 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
clear();
}
+ public void renameLevelFilesMods(Collection<Modification> filterModification,
+ Collection<TsFileResource> mergeTsFiles,
+ TsFileResource targetTsFile) throws IOException {
+ logger.debug("{} [compaction] merge starts to rename real file's mod",
storageGroupName);
+ List<Modification> modifications = new ArrayList<>();
+ for (TsFileResource mergeTsFile : mergeTsFiles) {
+ try (ModificationFile sourceModificationFile = new ModificationFile(
+ mergeTsFile.getTsFilePath() + ModificationFile.FILE_SUFFIX)) {
+ modifications.addAll(sourceModificationFile.getModifications());
+ if (sourceModificationFile.exists()) {
+ sourceModificationFile.remove();
+ }
+ }
+ }
+ modifications.removeAll(filterModification);
+ if (!modifications.isEmpty()) {
+ try (ModificationFile modificationFile = new ModificationFile(
+ targetTsFile.getTsFilePath() + ModificationFile.FILE_SUFFIX)) {
+ for (Modification modification : modifications) {
+ modificationFile.write(modification);
+ }
+ }
+ }
+ }
+
private void deleteLevelFilesInDisk(Collection<TsFileResource> mergeTsFiles)
{
logger.debug("{} [compaction] merge starts to delete real file",
storageGroupName);
for (TsFileResource mergeTsFile : mergeTsFiles) {
@@ -375,9 +401,10 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
writer.close();
CompactionLogger compactionLogger = new
CompactionLogger(storageGroupDir,
storageGroupName);
+ List<Modification> modifications = new ArrayList<>();
CompactionUtils
.merge(targetTsFileResource, getTsFileList(isSeq),
storageGroupName,
- compactionLogger, deviceSet, isSeq);
+ compactionLogger, deviceSet, isSeq, modifications);
compactionLogger.close();
} else {
writer.close();
@@ -403,10 +430,11 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
writer.close();
CompactionLogger compactionLogger = new
CompactionLogger(storageGroupDir,
storageGroupName);
+ List<Modification> modifications = new ArrayList<>();
CompactionUtils
.merge(targetResource, sourceTsFileResources, storageGroupName,
compactionLogger, deviceSet,
- isSeq);
+ isSeq, modifications);
// complete compaction and delete source file
writeLock();
try {
@@ -423,6 +451,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
writeUnlock();
}
deleteLevelFilesInDisk(sourceTsFileResources);
+ renameLevelFilesMods(modifications, sourceTsFileResources,
targetResource);
compactionLogger.close();
} else {
writer.close();
@@ -557,10 +586,11 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
}
TsFileResource newResource = new TsFileResource(newLevelFile);
+ List<Modification> modifications = new ArrayList<>();
// merge, read from source files and write to target file
CompactionUtils
.merge(newResource, toMergeTsFiles, storageGroupName,
compactionLogger,
- new HashSet<>(), sequence);
+ new HashSet<>(), sequence, modifications);
logger.info(
"{} [Compaction] merged level-{}'s {} TsFiles to next level,
and start to delete old files",
storageGroupName, i, toMergeTsFiles.size());
@@ -579,6 +609,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
writeUnlock();
}
deleteLevelFilesInDisk(toMergeTsFiles);
+ renameLevelFilesMods(modifications, toMergeTsFiles, newResource);
compactionLogger.close();
File logFile = FSFactoryProducer.getFSFactory()
.getFile(storageGroupDir, storageGroupName +
COMPACTION_LOG_NAME);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index bd31319..73bfcbe 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -54,7 +54,6 @@ import
org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,7 +70,8 @@ public class CompactionUtils {
private static Pair<ChunkMetadata, Chunk> readByAppendMerge(
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap,
- Map<String, List<Modification>> modificationCache, PartialPath
seriesPath)
+ Map<String, List<Modification>> modificationCache, PartialPath
seriesPath,
+ List<Modification> modifications)
throws IOException {
ChunkMetadata newChunkMetadata = null;
Chunk newChunk = null;
@@ -79,7 +79,8 @@ public class CompactionUtils {
.entrySet()) {
TsFileSequenceReader reader = entry.getKey();
List<ChunkMetadata> chunkMetadataList = entry.getValue();
- modifyChunkMetaDataWithCache(reader, chunkMetadataList,
modificationCache, seriesPath);
+ modifyChunkMetaDataWithCache(reader, chunkMetadataList,
modificationCache, seriesPath,
+ modifications);
for (ChunkMetadata chunkMetadata : chunkMetadataList) {
Chunk chunk = reader.readMemChunk(chunkMetadata);
if (newChunkMetadata == null) {
@@ -97,13 +98,13 @@ public class CompactionUtils {
private static long readByDeserializeMerge(
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap,
long maxVersion,
Map<Long, TimeValuePair> timeValuePairMap, Map<String,
List<Modification>> modificationCache,
- PartialPath seriesPath)
- throws IOException {
+ PartialPath seriesPath, List<Modification> modifications) throws
IOException {
for (Entry<TsFileSequenceReader, List<ChunkMetadata>> entry :
readerChunkMetadataMap
.entrySet()) {
TsFileSequenceReader reader = entry.getKey();
List<ChunkMetadata> chunkMetadataList = entry.getValue();
- modifyChunkMetaDataWithCache(reader, chunkMetadataList,
modificationCache, seriesPath);
+ modifyChunkMetaDataWithCache(reader, chunkMetadataList,
modificationCache, seriesPath,
+ modifications);
for (ChunkMetadata chunkMetadata : chunkMetadataList) {
maxVersion = Math.max(chunkMetadata.getVersion(), maxVersion);
IChunkReader chunkReader = new ChunkReaderByTimestamp(
@@ -121,14 +122,14 @@ public class CompactionUtils {
return maxVersion;
}
- private static long writeByAppendMerge(long maxVersion, String device,
+ public static long writeByAppendMerge(long maxVersion, String device,
RateLimiter compactionWriteRateLimiter,
Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry,
TsFileResource targetResource, RestorableTsFileIOWriter writer,
- Map<String, List<Modification>> modificationCache)
+ Map<String, List<Modification>> modificationCache, List<Modification>
modifications)
throws IOException, IllegalPathException {
Pair<ChunkMetadata, Chunk> chunkPair = readByAppendMerge(entry.getValue(),
- modificationCache, new PartialPath(device, entry.getKey()));
+ modificationCache, new PartialPath(device, entry.getKey()),
modifications);
ChunkMetadata newChunkMetadata = chunkPair.left;
Chunk newChunk = chunkPair.right;
if (newChunkMetadata != null && newChunk != null) {
@@ -143,15 +144,16 @@ public class CompactionUtils {
return maxVersion;
}
- private static long writeByDeserializeMerge(long maxVersion, String device,
+ public static long writeByDeserializeMerge(long maxVersion, String device,
RateLimiter compactionRateLimiter,
Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry,
TsFileResource targetResource, RestorableTsFileIOWriter writer,
- Map<String, List<Modification>> modificationCache) throws IOException,
IllegalPathException {
+ Map<String, List<Modification>> modificationCache, List<Modification>
modifications)
+ throws IOException, IllegalPathException {
Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>();
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap =
entry.getValue();
maxVersion = readByDeserializeMerge(readerChunkMetadataMap, maxVersion,
timeValuePairMap,
- modificationCache, new PartialPath(device, entry.getKey()));
+ modificationCache, new PartialPath(device, entry.getKey()),
modifications);
boolean isChunkMetadataEmpty = true;
for (List<ChunkMetadata> chunkMetadataList :
readerChunkMetadataMap.values()) {
if (!chunkMetadataList.isEmpty()) {
@@ -210,7 +212,8 @@ public class CompactionUtils {
public static void merge(TsFileResource targetResource,
List<TsFileResource> tsFileResources, String storageGroup,
CompactionLogger compactionLogger,
- Set<String> devices, boolean sequence) throws IOException,
IllegalPathException {
+ Set<String> devices, boolean sequence, List<Modification> modifications)
+ throws IOException, IllegalPathException {
RestorableTsFileIOWriter writer = new
RestorableTsFileIOWriter(targetResource.getTsFile());
Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new
HashMap<>();
Map<String, List<Modification>> modificationCache = new HashMap<>();
@@ -259,8 +262,7 @@ public class CompactionUtils {
for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>>
entry : measurementChunkMetadataMap
.entrySet()) {
maxVersion = writeByDeserializeMerge(maxVersion, device,
compactionWriteRateLimiter,
- entry,
- targetResource, writer, modificationCache);
+ entry, targetResource, writer, modificationCache, modifications);
}
writer.endChunkGroup();
writer.writeVersion(maxVersion);
@@ -282,13 +284,13 @@ public class CompactionUtils {
logger.debug("{} [Compaction] page enough large, use append
merge", storageGroup);
// append page in chunks, so we do not have to deserialize a chunk
maxVersion = writeByAppendMerge(maxVersion, device,
compactionWriteRateLimiter,
- entry, targetResource, writer, modificationCache);
+ entry, targetResource, writer, modificationCache,
modifications);
} else {
logger
.debug("{} [Compaction] page too small, use deserialize
merge", storageGroup);
// we have to deserialize chunks to merge pages
maxVersion = writeByDeserializeMerge(maxVersion, device,
compactionWriteRateLimiter,
- entry, targetResource, writer, modificationCache);
+ entry, targetResource, writer, modificationCache,
modifications);
}
}
writer.endChunkGroup();
@@ -334,7 +336,7 @@ public class CompactionUtils {
private static void modifyChunkMetaDataWithCache(TsFileSequenceReader reader,
List<ChunkMetadata> chunkMetadataList, Map<String, List<Modification>>
modificationCache,
- PartialPath seriesPath) {
+ PartialPath seriesPath, List<Modification> usedModifications) {
List<Modification> modifications =
modificationCache.computeIfAbsent(reader.getFileName(),
fileName -> new LinkedList<>(
@@ -344,6 +346,7 @@ public class CompactionUtils {
for (Modification modification : modifications) {
if (modification.getPath().matchFullPath(seriesPath)) {
seriesModifications.add(modification);
+ usedModifications.add(modification);
}
}
modifyChunkMetaData(chunkMetadataList, seriesModifications);
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
new file mode 100644
index 0000000..efb24f8
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.util.concurrent.RateLimiter;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
+import org.apache.iotdb.db.engine.merge.manage.MergeManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.IChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CompactionChunkTest extends LevelCompactionTest {
+
+ File tempSGDir;
+
+ @Before
+ public void setUp() throws IOException, WriteProcessException,
MetadataException {
+ super.setUp();
+ tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
+ tempSGDir.mkdirs();
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ super.tearDown();
+ FileUtils.deleteDirectory(tempSGDir);
+ }
+
+ @Test
+ public void testAppendMerge() throws IOException, IllegalPathException {
+ Map<String, Map<TsFileSequenceReader, List<ChunkMetadata>>>
measurementChunkMetadataMap = new HashMap<>();
+ List<TsFileResource> sourceTsfileResources = seqResources.subList(0, 2);
+ File file = new File(TestConstant.BASE_OUTPUT_PATH
+ .concat(0 + IoTDBConstant.FILE_NAME_SEPARATOR + 0 +
IoTDBConstant.FILE_NAME_SEPARATOR + 1
+ + ".tsfile"));
+ TsFileResource targetTsfileResource = new TsFileResource(file);
+ RateLimiter compactionWriteRateLimiter =
MergeManager.getINSTANCE().getMergeWriteRateLimiter();
+ String device = COMPACTION_TEST_SG + PATH_SEPARATOR + "device0";
+ RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(
+ targetTsfileResource.getTsFile());
+ writer.startChunkGroup(device);
+ for (TsFileResource tsFileResource : sourceTsfileResources) {
+ TsFileSequenceReader reader = new
TsFileSequenceReader(tsFileResource.getTsFilePath());
+ Map<String, List<ChunkMetadata>> chunkMetadataMap =
reader.readChunkMetadataInDevice(device);
+ for (Entry<String, List<ChunkMetadata>> entry :
chunkMetadataMap.entrySet()) {
+ for (ChunkMetadata chunkMetadata : entry.getValue()) {
+ Map<TsFileSequenceReader, List<ChunkMetadata>>
readerChunkMetadataMap;
+ String measurementUid = chunkMetadata.getMeasurementUid();
+ if (measurementChunkMetadataMap.containsKey(measurementUid)) {
+ readerChunkMetadataMap =
measurementChunkMetadataMap.get(measurementUid);
+ } else {
+ readerChunkMetadataMap = new LinkedHashMap<>();
+ }
+ List<ChunkMetadata> chunkMetadataList;
+ if (readerChunkMetadataMap.containsKey(reader)) {
+ chunkMetadataList = readerChunkMetadataMap.get(reader);
+ } else {
+ chunkMetadataList = new ArrayList<>();
+ }
+ chunkMetadataList.add(chunkMetadata);
+ readerChunkMetadataMap.put(reader, chunkMetadataList);
+ measurementChunkMetadataMap
+ .put(chunkMetadata.getMeasurementUid(), readerChunkMetadataMap);
+ }
+ }
+ for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry
: measurementChunkMetadataMap
+ .entrySet()) {
+ CompactionUtils
+ .writeByAppendMerge(0, device, compactionWriteRateLimiter, entry,
targetTsfileResource,
+ writer, new HashMap<>(), new ArrayList<>());
+ }
+ reader.close();
+ }
+ writer.endChunkGroup();
+ targetTsfileResource.serialize();
+ writer.endFile();
+ targetTsfileResource.close();
+
+ TsFileSequenceReader reader = new TsFileSequenceReader(file.getPath());
+ List<Path> paths = reader.getAllPaths();
+ for (Path path : paths) {
+ List<ChunkMetadata> chunkMetadataList =
reader.getChunkMetadataList(path);
+ for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ Chunk chunk = reader.readMemChunk(chunkMetadata);
+ IChunkReader chunkReader = new ChunkReaderByTimestamp(chunk);
+ long totalPointCount = 0;
+ while (chunkReader.hasNextSatisfiedPage()) {
+ BatchData batchData = chunkReader.nextPageData();
+ for (int i = 0; i < batchData.length(); i++) {
+ assertEquals(batchData.getTimeByIndex(i),
batchData.getDoubleByIndex(i), 0.001);
+ }
+ totalPointCount += batchData.length();
+ }
+ assertEquals(totalPointCount, chunkMetadata.getNumOfPoints());
+ }
+ }
+ reader.close();
+ }
+
+ @Test
+ public void testDeserializeMerge() throws IOException, IllegalPathException {
+ Map<String, Map<TsFileSequenceReader, List<ChunkMetadata>>>
measurementChunkMetadataMap = new HashMap<>();
+ List<TsFileResource> sourceTsfileResources = seqResources.subList(0, 2);
+ File file = new File(TestConstant.BASE_OUTPUT_PATH
+ .concat(0 + IoTDBConstant.FILE_NAME_SEPARATOR + 0 +
IoTDBConstant.FILE_NAME_SEPARATOR + 1
+ + ".tsfile"));
+ TsFileResource targetTsfileResource = new TsFileResource(file);
+ RateLimiter compactionWriteRateLimiter =
MergeManager.getINSTANCE().getMergeWriteRateLimiter();
+ String device = COMPACTION_TEST_SG + PATH_SEPARATOR + "device0";
+ RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(
+ targetTsfileResource.getTsFile());
+ writer.startChunkGroup(device);
+ for (TsFileResource tsFileResource : sourceTsfileResources) {
+ TsFileSequenceReader reader = new
TsFileSequenceReader(tsFileResource.getTsFilePath());
+ Map<String, List<ChunkMetadata>> chunkMetadataMap =
reader.readChunkMetadataInDevice(device);
+ for (Entry<String, List<ChunkMetadata>> entry :
chunkMetadataMap.entrySet()) {
+ for (ChunkMetadata chunkMetadata : entry.getValue()) {
+ Map<TsFileSequenceReader, List<ChunkMetadata>>
readerChunkMetadataMap;
+ String measurementUid = chunkMetadata.getMeasurementUid();
+ if (measurementChunkMetadataMap.containsKey(measurementUid)) {
+ readerChunkMetadataMap =
measurementChunkMetadataMap.get(measurementUid);
+ } else {
+ readerChunkMetadataMap = new LinkedHashMap<>();
+ }
+ List<ChunkMetadata> chunkMetadataList;
+ if (readerChunkMetadataMap.containsKey(reader)) {
+ chunkMetadataList = readerChunkMetadataMap.get(reader);
+ } else {
+ chunkMetadataList = new ArrayList<>();
+ }
+ chunkMetadataList.add(chunkMetadata);
+ readerChunkMetadataMap.put(reader, chunkMetadataList);
+ measurementChunkMetadataMap
+ .put(chunkMetadata.getMeasurementUid(), readerChunkMetadataMap);
+ }
+ }
+ for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry
: measurementChunkMetadataMap
+ .entrySet()) {
+ CompactionUtils
+ .writeByDeserializeMerge(0, device, compactionWriteRateLimiter,
entry,
+ targetTsfileResource,
+ writer, new HashMap<>(), new ArrayList<>());
+ }
+ reader.close();
+ }
+ writer.endChunkGroup();
+ targetTsfileResource.serialize();
+ writer.endFile();
+ targetTsfileResource.close();
+
+ TsFileSequenceReader reader = new TsFileSequenceReader(file.getPath());
+ List<Path> paths = reader.getAllPaths();
+ for (Path path : paths) {
+ List<ChunkMetadata> chunkMetadataList =
reader.getChunkMetadataList(path);
+ for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ Chunk chunk = reader.readMemChunk(chunkMetadata);
+ IChunkReader chunkReader = new ChunkReaderByTimestamp(chunk);
+ long totalPointCount = 0;
+ while (chunkReader.hasNextSatisfiedPage()) {
+ BatchData batchData = chunkReader.nextPageData();
+ for (int i = 0; i < batchData.length(); i++) {
+ assertEquals(batchData.getTimeByIndex(i),
batchData.getDoubleByIndex(i), 0.001);
+ }
+ totalPointCount += batchData.length();
+ }
+ assertEquals(totalPointCount, chunkMetadata.getNumOfPoints());
+ }
+ }
+ reader.close();
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java
new file mode 100644
index 0000000..462759c
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.constant.TestConstant;
+import
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LevelCompactionModsTest extends LevelCompactionTest {
+
+ File tempSGDir;
+
+ @Before
+ public void setUp() throws IOException, WriteProcessException,
MetadataException {
+ super.setUp();
+ tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
+ tempSGDir.mkdirs();
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ super.tearDown();
+ FileUtils.deleteDirectory(tempSGDir);
+ }
+
+ @Test
+ public void testCompactionMods() throws IllegalPathException, IOException {
+ LevelCompactionTsFileManagement levelCompactionTsFileManagement = new
LevelCompactionTsFileManagement(
+ COMPACTION_TEST_SG, tempSGDir.getPath());
+ TsFileResource sourceTsFileResource = seqResources.get(0);
+ TsFileResource targetTsFileResource = seqResources.get(1);
+ List<Modification> filterModifications = new ArrayList<>();
+ Modification modification1;
+ Modification modification2;
+ try (ModificationFile sourceModificationFile = new ModificationFile(
+ sourceTsFileResource.getTsFilePath() + ModificationFile.FILE_SUFFIX)) {
+ modification1 = new Deletion(new PartialPath(deviceIds[0], "sensor0"),
0, 0);
+ modification2 = new Deletion(new PartialPath(deviceIds[0], "sensor1"),
0, 0);
+ sourceModificationFile.write(modification1);
+ sourceModificationFile.write(modification2);
+ filterModifications.add(modification1);
+ }
+ List<TsFileResource> sourceTsFileResources = new ArrayList<>();
+ sourceTsFileResources.add(sourceTsFileResource);
+ levelCompactionTsFileManagement
+ .renameLevelFilesMods(filterModifications, sourceTsFileResources,
targetTsFileResource);
+ try (ModificationFile targetModificationFile = new ModificationFile(
+ targetTsFileResource.getTsFilePath() + ModificationFile.FILE_SUFFIX)) {
+ Collection<Modification> modifications =
targetModificationFile.getModifications();
+ assertEquals(1, modifications.size());
+ assertEquals(modification2, modifications.stream().findFirst().get());
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index b401c03..d8a37a1 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -111,7 +111,7 @@ public class LevelCompactionRecoverTest extends
LevelCompactionTest {
+ ".tsfile")));
compactionLogger.logFile(TARGET_NAME, targetTsFileResource.getTsFile());
CompactionUtils.merge(targetTsFileResource, new
ArrayList<>(seqResources.subList(0, 3)),
- COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true);
+ COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true, new
ArrayList<>());
compactionLogger.close();
levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
levelCompactionTsFileManagement.recover();
@@ -170,7 +170,7 @@ public class LevelCompactionRecoverTest extends
LevelCompactionTest {
+ ".tsfile")));
compactionLogger.logFile(TARGET_NAME, targetTsFileResource.getTsFile());
CompactionUtils.merge(targetTsFileResource, new
ArrayList<>(seqResources.subList(0, 3)),
- COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true);
+ COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true, new
ArrayList<>());
compactionLogger.close();
BufferedReader logReader = new BufferedReader(
@@ -248,7 +248,7 @@ public class LevelCompactionRecoverTest extends
LevelCompactionTest {
+ ".tsfile")));
compactionLogger.logFile(TARGET_NAME, targetTsFileResource.getTsFile());
CompactionUtils.merge(targetTsFileResource, new
ArrayList<>(seqResources.subList(0, 3)),
- COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true);
+ COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true, new
ArrayList<>());
compactionLogger.close();
BufferedReader logReader = new BufferedReader(
@@ -331,7 +331,7 @@ public class LevelCompactionRecoverTest extends
LevelCompactionTest {
+ ".tsfile")));
compactionLogger.logFile(TARGET_NAME, targetTsFileResource.getTsFile());
CompactionUtils.merge(targetTsFileResource, new
ArrayList<>(seqResources.subList(0, 3)),
- COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), false);
+ COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), false, new
ArrayList<>());
compactionLogger.close();
levelCompactionTsFileManagement.addRecover(targetTsFileResource, false);
levelCompactionTsFileManagement.recover();
@@ -483,7 +483,7 @@ public class LevelCompactionRecoverTest extends
LevelCompactionTest {
+ ".tsfile")));
compactionLogger.logFile(TARGET_NAME, targetTsFileResource.getTsFile());
CompactionUtils.merge(targetTsFileResource, new
ArrayList<>(seqResources.subList(0, 3)),
- COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true);
+ COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true, new
ArrayList<>());
levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
compactionLogger.close();
levelCompactionTsFileManagement.recover();