This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch pr_1758 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c61fd434f0be58fa1b291fca9f9e8caebc5ff725 Author: 张凌哲 <[email protected]> AuthorDate: Thu Oct 8 13:07:44 2020 +0800 add merge page point number --- .../resources/conf/iotdb-engine.properties | 4 + .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 27 +++-- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 + .../tsfilemanagement/utils/HotCompactionUtils.java | 116 ++++++++++++++------- 4 files changed, 105 insertions(+), 46 deletions(-) diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index 97ded95..5fb3777 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -279,6 +279,10 @@ max_unseq_level_num=2 # When merge point number reaches this, merge the files to the last level. merge_chunk_point_number=100000 +# Work when tsfile_manage_strategy is level_strategy. +# When page point number of file reaches this, use append merge instead of deserialize merge. +merge_page_point_number=1000 + # How many thread will be set up to perform merge main tasks, 1 by default. # Set to 1 when less than or equal to 0. merge_thread_num=1 diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 5322c07..d53892f 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -261,13 +261,19 @@ public class IoTDBConfig { private int mergeChunkPointNumberThreshold = 100000; /** + * Work when tsfile_manage_strategy is level_strategy. When page point number of file reaches + * this, use append merge instead of deserialize merge. + */ + private int mergePagePointNumberThreshold = 1000; + + /** * TsFile manage strategy, define use which hot compaction strategy */ private TsFileManagementStrategy tsFileManagementStrategy = TsFileManagementStrategy.NORMAL_STRATEGY; /** - * Work when tsfile_manage_strategy is level_strategy. The max seq file num of each level. When file - * num exceeds this, the files in one level will merge to one. + * Work when tsfile_manage_strategy is level_strategy. The max seq file num of each level. When + * file num exceeds this, the files in one level will merge to one. */ private int maxFileNumInEachLevel = 10; @@ -277,8 +283,8 @@ public class IoTDBConfig { private int maxLevelNum = 4; /** - * Work when tsfile_manage_strategy is level_strategy. The max unseq file num of each level. When file - * num exceeds this, the files in one level will merge to one. + * Work when tsfile_manage_strategy is level_strategy. The max unseq file num of each level. When + * file num exceeds this, the files in one level will merge to one. */ private int maxUnseqFileNumInEachLevel = 10; @@ -617,8 +623,9 @@ public class IoTDBConfig { private int defaultFillInterval = -1; /** - * default TTL for storage groups that are not set TTL by statements, in ms - * Notice: if this property is changed, previous created storage group which are not set TTL will also be affected. + * default TTL for storage groups that are not set TTL by statements, in ms Notice: if this + * property is changed, previous created storage group which are not set TTL will also be + * affected. */ private long defaultTTL = Long.MAX_VALUE; @@ -1284,6 +1291,14 @@ public class IoTDBConfig { this.mergeChunkPointNumberThreshold = mergeChunkPointNumberThreshold; } + public int getMergePagePointNumberThreshold() { + return mergePagePointNumberThreshold; + } + + public void setMergePagePointNumberThreshold(int mergePagePointNumberThreshold) { + this.mergePagePointNumberThreshold = mergePagePointNumberThreshold; + } + public MergeFileStrategy getMergeFileStrategy() { return mergeFileStrategy; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index b77f4d0..0d35a0d 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -275,6 +275,10 @@ public class IoTDBDescriptor { .getProperty("merge_chunk_point_number", Integer.toString(conf.getMergeChunkPointNumberThreshold())))); + conf.setMergePagePointNumberThreshold(Integer.parseInt(properties + .getProperty("merge_page_point_number", + Integer.toString(conf.getMergePagePointNumberThreshold())))); + conf.setTsFileManagementStrategy(TsFileManagementStrategy.valueOf(properties .getProperty("tsfile_manage_strategy", conf.getTsFileManagementStrategy().toString()))); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java index 20aa0d2..35e5bd4 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java @@ -24,7 +24,6 @@ import static org.apache.iotdb.db.utils.MergeUtils.writeTVPair; import com.google.common.util.concurrent.RateLimiter; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -34,10 +33,10 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.merge.manage.MergeManager; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Chunk; @@ -56,12 +55,14 @@ import org.slf4j.LoggerFactory; public class HotCompactionUtils { private static final Logger logger = LoggerFactory.getLogger(HotCompactionUtils.class); + private static final int mergePagePointNum = IoTDBDescriptor.getInstance().getConfig() + .getMergePagePointNumberThreshold(); private HotCompactionUtils() { throw new IllegalStateException("Utility class"); } - private static Pair<ChunkMetadata, Chunk> readSeqChunk( + private static Pair<ChunkMetadata, Chunk> readByAppendMerge( Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap) throws IOException { ChunkMetadata newChunkMetadata = null; Chunk newChunk = null; @@ -81,7 +82,7 @@ public class HotCompactionUtils { return new Pair<>(newChunkMetadata, newChunk); } - private static long readUnseqChunk( + private static long readByDeserializeMerge( Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap, long maxVersion, Map<Long, TimeValuePair> timeValuePairMap) throws IOException { @@ -106,6 +107,54 @@ public class HotCompactionUtils { return maxVersion; } + private static long writeByAppendMerge(long maxVersion, String device, + RateLimiter compactionRateLimiter, + Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadatasMap, + TsFileResource targetResource, RestorableTsFileIOWriter writer) throws IOException { + Pair<ChunkMetadata, Chunk> chunkPair = readByAppendMerge(readerChunkMetadatasMap); + ChunkMetadata newChunkMetadata = chunkPair.left; + Chunk newChunk = chunkPair.right; + if (newChunkMetadata != null && newChunk != null) { + maxVersion = Math.max(newChunkMetadata.getVersion(), maxVersion); + // wait for limit write + MergeManager.mergeRateLimiterAcquire(compactionRateLimiter, + newChunk.getHeader().getDataSize() + newChunk.getData().position()); + writer.writeChunk(newChunk, newChunkMetadata); + targetResource.updateStartTime(device, newChunkMetadata.getStartTime()); + targetResource.updateEndTime(device, newChunkMetadata.getEndTime()); + } + return maxVersion; + } + + private static long writeByDeserializeMerge(long maxVersion, String device, + RateLimiter compactionRateLimiter, + Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry, + TsFileResource targetResource, RestorableTsFileIOWriter writer) throws IOException { + Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>(); + maxVersion = readByDeserializeMerge(entry.getValue(), maxVersion, timeValuePairMap); + Iterator<List<ChunkMetadata>> chunkMetadataListIterator = entry.getValue().values() + .iterator(); + if (!chunkMetadataListIterator.hasNext()) { + return maxVersion; + } + List<ChunkMetadata> chunkMetadataList = chunkMetadataListIterator.next(); + if (chunkMetadataList.size() <= 0) { + return maxVersion; + } + IChunkWriter chunkWriter = new ChunkWriterImpl( + new MeasurementSchema(entry.getKey(), chunkMetadataList.get(0).getDataType())); + for (TimeValuePair timeValuePair : timeValuePairMap.values()) { + writeTVPair(timeValuePair, chunkWriter); + targetResource.updateStartTime(device, timeValuePair.getTimestamp()); + targetResource.updateEndTime(device, timeValuePair.getTimestamp()); + } + // wait for limit write + MergeManager + .mergeRateLimiterAcquire(compactionRateLimiter, chunkWriter.getCurrentChunkSize()); + chunkWriter.writeToFileWriter(writer); + return maxVersion; + } + private static Set<String> getTsFileDevicesSet(List<TsFileResource> subLevelResources, Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, String storageGroup) throws IOException { @@ -177,51 +226,38 @@ public class HotCompactionUtils { long maxVersion = Long.MIN_VALUE; for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry : measurementChunkMetadataMap .entrySet()) { - Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>(); - maxVersion = readUnseqChunk(entry.getValue(), maxVersion, timeValuePairMap); - Iterator<List<ChunkMetadata>> chunkMetadataListIterator = entry.getValue().values() - .iterator(); - if (!chunkMetadataListIterator.hasNext()) { - continue; - } - List<ChunkMetadata> chunkMetadataList = chunkMetadataListIterator.next(); - if (chunkMetadataList.size() <= 0) { - continue; - } - IChunkWriter chunkWriter = new ChunkWriterImpl( - new MeasurementSchema(entry.getKey(), chunkMetadataList.get(0).getDataType())); - for (TimeValuePair timeValuePair : timeValuePairMap.values()) { - writeTVPair(timeValuePair, chunkWriter); - targetResource.updateStartTime(device, timeValuePair.getTimestamp()); - targetResource.updateEndTime(device, timeValuePair.getTimestamp()); - } - // wait for limit write - MergeManager - .mergeRateLimiterAcquire(compactionRateLimiter, chunkWriter.getCurrentChunkSize()); - chunkWriter.writeToFileWriter(writer); - if (hotCompactionLogger != null) { - hotCompactionLogger.logDevice(device, writer.getPos()); - } + maxVersion = writeByDeserializeMerge(maxVersion, device, compactionRateLimiter, entry, + targetResource, writer); } writer.endChunkGroup(); writer.writeVersion(maxVersion); + if (hotCompactionLogger != null) { + hotCompactionLogger.logDevice(device, writer.getPos()); + } } else { + long maxVersion = Long.MIN_VALUE; for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry : measurementChunkMetadataMap .entrySet()) { - Pair<ChunkMetadata, Chunk> chunkPair = readSeqChunk( - entry.getValue()); - ChunkMetadata newChunkMetadata = chunkPair.left; - Chunk newChunk = chunkPair.right; - if (newChunkMetadata != null && newChunk != null) { - // wait for limit write - MergeManager.mergeRateLimiterAcquire(compactionRateLimiter, - (long)newChunk.getHeader().getDataSize() + newChunk.getData().position()); - writer.writeChunk(newChunk, newChunkMetadata); - targetResource.updateStartTime(device, newChunkMetadata.getStartTime()); - targetResource.updateEndTime(device, newChunkMetadata.getEndTime()); + Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadatasMap = entry.getValue(); + boolean isPageEnoughLarge = true; + for (List<ChunkMetadata> chunkMetadatas : readerChunkMetadatasMap.values()) { + for (ChunkMetadata chunkMetadata : chunkMetadatas) { + if (chunkMetadata.getNumOfPoints() < mergePagePointNum) { + isPageEnoughLarge = false; + break; + } + } + } + if (isPageEnoughLarge) { + maxVersion = writeByAppendMerge(maxVersion, device, compactionRateLimiter, + readerChunkMetadatasMap, targetResource, writer); + } else { + maxVersion = writeByDeserializeMerge(maxVersion, device, compactionRateLimiter, entry, + targetResource, writer); } } writer.endChunkGroup(); + writer.writeVersion(maxVersion); if (hotCompactionLogger != null) { hotCompactionLogger.logDevice(device, writer.getPos()); }
