This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 52243ce [IOTDB-1357] Compaction use append chunk merge strategy when
chunk is already large (#3159) (#3188)
52243ce is described below
commit 52243ce47fa7c2806eb7e146cc4d91ee88354110
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Wed May 19 11:07:11 2021 +0800
[IOTDB-1357] Compaction use append chunk merge strategy when chunk is
already large (#3159) (#3188)
---
.../engine/compaction/utils/CompactionUtils.java | 81 +++++++++++++++++-----
.../db/engine/compaction/CompactionChunkTest.java | 4 +-
.../compaction/LevelCompactionMergeTest.java | 80 ++++++++++++++++++++-
3 files changed, 144 insertions(+), 21 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 9bc3ce2..a6aa68c 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -67,14 +67,12 @@ import static
org.apache.iotdb.db.utils.QueryUtils.modifyChunkMetaData;
public class CompactionUtils {
private static final Logger logger =
LoggerFactory.getLogger(CompactionUtils.class);
- private static final int MERGE_PAGE_POINT_NUM =
-
IoTDBDescriptor.getInstance().getConfig().getMergePagePointNumberThreshold();
private CompactionUtils() {
throw new IllegalStateException("Utility class");
}
- private static Pair<ChunkMetadata, Chunk> readByAppendMerge(
+ private static Pair<ChunkMetadata, Chunk> readByAppendPageMerge(
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap)
throws IOException {
ChunkMetadata newChunkMetadata = null;
Chunk newChunk = null;
@@ -96,7 +94,7 @@ public class CompactionUtils {
return new Pair<>(newChunkMetadata, newChunk);
}
- private static void readByDeserializeMerge(
+ private static void readByDeserializePageMerge(
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap,
Map<Long, TimeValuePair> timeValuePairMap,
Map<String, List<Modification>> modificationCache,
@@ -122,14 +120,43 @@ public class CompactionUtils {
}
}
- public static void writeByAppendMerge(
+ /**
+ * When chunk is large enough, we do not have to merge them any more. Just
read chunks and write
+ * them to the new file directly.
+ */
+ public static void writeByAppendChunkMerge(
String device,
RateLimiter compactionWriteRateLimiter,
Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry,
TsFileResource targetResource,
RestorableTsFileIOWriter writer)
throws IOException {
- Pair<ChunkMetadata, Chunk> chunkPair = readByAppendMerge(entry.getValue());
+ Map<TsFileSequenceReader, List<ChunkMetadata>> readerListMap =
entry.getValue();
+ for (Entry<TsFileSequenceReader, List<ChunkMetadata>> readerListEntry :
+ readerListMap.entrySet()) {
+ TsFileSequenceReader reader = readerListEntry.getKey();
+ List<ChunkMetadata> chunkMetadataList = readerListEntry.getValue();
+ // read chunk and write it to new file directly
+ for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ Chunk chunk = reader.readMemChunk(chunkMetadata);
+ MergeManager.mergeRateLimiterAcquire(
+ compactionWriteRateLimiter,
+ (long) chunk.getHeader().getDataSize() +
chunk.getData().position());
+ writer.writeChunk(chunk, chunkMetadata);
+ targetResource.updateStartTime(device, chunkMetadata.getStartTime());
+ targetResource.updateEndTime(device, chunkMetadata.getEndTime());
+ }
+ }
+ }
+
+ public static void writeByAppendPageMerge(
+ String device,
+ RateLimiter compactionWriteRateLimiter,
+ Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry,
+ TsFileResource targetResource,
+ RestorableTsFileIOWriter writer)
+ throws IOException {
+ Pair<ChunkMetadata, Chunk> chunkPair =
readByAppendPageMerge(entry.getValue());
ChunkMetadata newChunkMetadata = chunkPair.left;
Chunk newChunk = chunkPair.right;
if (newChunkMetadata != null && newChunk != null) {
@@ -143,7 +170,7 @@ public class CompactionUtils {
}
}
- public static void writeByDeserializeMerge(
+ public static void writeByDeserializePageMerge(
String device,
RateLimiter compactionRateLimiter,
Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry,
@@ -154,7 +181,7 @@ public class CompactionUtils {
throws IOException, IllegalPathException {
Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>();
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap =
entry.getValue();
- readByDeserializeMerge(
+ readByDeserializePageMerge(
readerChunkMetadataMap,
timeValuePairMap,
modificationCache,
@@ -317,7 +344,7 @@ public class CompactionUtils {
sensorReaderChunkMetadataListEntry =
new DefaultMapEntry<>(sensor, readerChunkMetadataListMap);
if (!sequence) {
- writeByDeserializeMerge(
+ writeByDeserializePageMerge(
device,
compactionWriteRateLimiter,
sensorReaderChunkMetadataListEntry,
@@ -326,28 +353,50 @@ public class CompactionUtils {
modificationCache,
modifications);
} else {
+ boolean isChunkEnoughLarge = true;
boolean isPageEnoughLarge = true;
for (List<ChunkMetadata> chunkMetadatas :
readerChunkMetadataListMap.values()) {
for (ChunkMetadata chunkMetadata : chunkMetadatas) {
- if (chunkMetadata.getNumOfPoints() < MERGE_PAGE_POINT_NUM) {
+ if (chunkMetadata.getNumOfPoints()
+ < IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getMergePagePointNumberThreshold()) {
isPageEnoughLarge = false;
- break;
+ }
+ if (chunkMetadata.getNumOfPoints()
+ < IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getMergeChunkPointNumberThreshold()) {
+ isChunkEnoughLarge = false;
}
}
}
- if (isPageEnoughLarge) {
- logger.debug("{} [Compaction] page enough large, use append
merge", storageGroup);
+ // if a chunk is large enough, it's page must be large enough too
+ if (isChunkEnoughLarge) {
+ logger.debug(
+ "{} [Compaction] chunk enough large, use append chunk
merge", storageGroup);
+ // append page in chunks, so we do not have to deserialize a
chunk
+ writeByAppendChunkMerge(
+ device,
+ compactionWriteRateLimiter,
+ sensorReaderChunkMetadataListEntry,
+ targetResource,
+ writer);
+ } else if (isPageEnoughLarge) {
+ logger.debug(
+ "{} [Compaction] page enough large, use append page
merge", storageGroup);
// append page in chunks, so we do not have to deserialize a
chunk
- writeByAppendMerge(
+ writeByAppendPageMerge(
device,
compactionWriteRateLimiter,
sensorReaderChunkMetadataListEntry,
targetResource,
writer);
} else {
- logger.debug("{} [Compaction] page too small, use deserialize
merge", storageGroup);
+ logger.debug(
+ "{} [Compaction] page too small, use deserialize page
merge", storageGroup);
// we have to deserialize chunks to merge pages
- writeByDeserializeMerge(
+ writeByDeserializePageMerge(
device,
compactionWriteRateLimiter,
sensorReaderChunkMetadataListEntry,
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
index 8a61859..5e7276a 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
@@ -120,7 +120,7 @@ public class CompactionChunkTest extends
LevelCompactionTest {
}
for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry
:
measurementChunkMetadataMap.entrySet()) {
- CompactionUtils.writeByAppendMerge(
+ CompactionUtils.writeByAppendPageMerge(
device, compactionWriteRateLimiter, entry, targetTsfileResource,
writer);
}
reader.close();
@@ -199,7 +199,7 @@ public class CompactionChunkTest extends
LevelCompactionTest {
}
for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry
:
measurementChunkMetadataMap.entrySet()) {
- CompactionUtils.writeByDeserializeMerge(
+ CompactionUtils.writeByDeserializePageMerge(
device,
compactionWriteRateLimiter,
entry,
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index 9b3984b..579fd88 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -35,6 +35,8 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
@@ -47,6 +49,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -210,9 +213,6 @@ public class LevelCompactionMergeTest extends
LevelCompactionTest {
long count = 0L;
while (tsFilesReader.hasNextBatch()) {
BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- System.out.println(batchData.getTimeByIndex(i));
- }
count += batchData.length();
}
assertEquals(489, count);
@@ -225,6 +225,80 @@ public class LevelCompactionMergeTest extends
LevelCompactionTest {
IoTDBDescriptor.getInstance().getConfig().setMergePagePointNumberThreshold(prevPageLimit);
}
+ /** test append chunk merge, the chunk is already large than
merge_chunk_point_number */
+ @Test
+ public void testCompactionAppendChunkMerge() throws IOException {
+ int prevMergeChunkPointNumberThreshold =
+
IoTDBDescriptor.getInstance().getConfig().getMergeChunkPointNumberThreshold();
+
IoTDBDescriptor.getInstance().getConfig().setMergeChunkPointNumberThreshold(1);
+
+ LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG,
tempSGDir.getPath());
+ levelCompactionTsFileManagement.addAll(seqResources, true);
+ levelCompactionTsFileManagement.addAll(unseqResources, false);
+ levelCompactionTsFileManagement.forkCurrentFileList(0);
+ CompactionMergeTask compactionMergeTask =
+ levelCompactionTsFileManagement
+ .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
+ compactionMergeWorking = true;
+ compactionMergeTask.call();
+ while (compactionMergeWorking) {
+ // wait
+ }
+ TsFileResource newTsFileResource =
+ levelCompactionTsFileManagement.getTsFileListByTimePartition(true,
0).get(0);
+ TsFileSequenceReader tsFileSequenceReader =
+ new TsFileSequenceReader(newTsFileResource.getTsFilePath());
+ Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+ tsFileSequenceReader.readChunkMetadataInDevice(deviceIds[0]);
+ for (List<ChunkMetadata> chunkMetadataList :
sensorChunkMetadataListMap.values()) {
+ for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ assertEquals(20, chunkMetadata.getNumOfPoints());
+ }
+ }
+ tsFileSequenceReader.close();
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setMergeChunkPointNumberThreshold(prevMergeChunkPointNumberThreshold);
+ }
+
+ /** test not append chunk merge, the chunk is smaller than
merge_chunk_point_number */
+ @Test
+ public void testCompactionNoAppendChunkMerge() throws IOException {
+ int prevMergeChunkPointNumberThreshold =
+
IoTDBDescriptor.getInstance().getConfig().getMergeChunkPointNumberThreshold();
+
IoTDBDescriptor.getInstance().getConfig().setMergeChunkPointNumberThreshold(100000);
+
+ LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG,
tempSGDir.getPath());
+ levelCompactionTsFileManagement.addAll(seqResources, true);
+ levelCompactionTsFileManagement.addAll(unseqResources, false);
+ levelCompactionTsFileManagement.forkCurrentFileList(0);
+ CompactionMergeTask compactionMergeTask =
+ levelCompactionTsFileManagement
+ .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
+ compactionMergeWorking = true;
+ compactionMergeTask.call();
+ while (compactionMergeWorking) {
+ // wait
+ }
+ TsFileResource newTsFileResource =
+ levelCompactionTsFileManagement.getTsFileListByTimePartition(true,
0).get(0);
+ TsFileSequenceReader tsFileSequenceReader =
+ new TsFileSequenceReader(newTsFileResource.getTsFilePath());
+ Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+ tsFileSequenceReader.readChunkMetadataInDevice(deviceIds[0]);
+ for (List<ChunkMetadata> chunkMetadataList :
sensorChunkMetadataListMap.values()) {
+ for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ assertEquals(500, chunkMetadata.getNumOfPoints());
+ }
+ }
+ tsFileSequenceReader.close();
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setMergeChunkPointNumberThreshold(prevMergeChunkPointNumberThreshold);
+ }
+
/** close compaction merge callback, to release some locks */
private void closeCompactionMergeCallBack(
boolean isMergeExecutedInCurrentTask, long timePartitionId) {