This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch pr_1758 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5667cf33f68f85f5463ccb89fd3a4ad4adf307c3 Author: 张凌哲 <[email protected]> AuthorDate: Sat Oct 10 12:11:58 2020 +0800 add read compaction limiter --- .../resources/conf/iotdb-engine.properties | 5 ++- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 23 ++++++++--- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 7 ++-- .../iotdb/db/engine/merge/manage/MergeManager.java | 33 ++++++++++++---- .../tsfilemanagement/utils/HotCompactionUtils.java | 44 ++++++++++++++++------ .../iotdb/db/engine/merge/MergeManagerTest.java | 2 +- .../iotdb/tsfile/file/metadata/ChunkMetadata.java | 3 ++ 7 files changed, 87 insertions(+), 30 deletions(-) diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index 5fb3777..819e561 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -325,7 +325,10 @@ force_full_merge=false chunk_merge_point_threshold=20480 # The limit of write throughput merge can reach per second -merge_throughput_mb_per_sec=16 +merge_write_throughput_mb_per_sec=16 + +# The limit of read throughput merge can reach per second +merge_read_throughput_mb_per_sec=16 #################### ### Metadata Cache Configuration diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index d53892f..c6163a6 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -543,7 +543,12 @@ public class IoTDBConfig { /** * The limit of write throughput merge can reach per second */ - private int mergeThroughputMbPerSec = 16; + private int mergeWriteThroughputMbPerSec = 16; + + /** + * The limit of read throughput merge can reach per second + */ + private int mergeReadThroughputMbPerSec = 16; private MergeFileStrategy mergeFileStrategy = MergeFileStrategy.MAX_SERIES_NUM; @@ -1259,12 +1264,20 @@ public class IoTDBConfig { this.chunkMergePointThreshold = chunkMergePointThreshold; } - public int getMergeThroughputMbPerSec() { - return mergeThroughputMbPerSec; + public int getMergeWriteThroughputMbPerSec() { + return mergeWriteThroughputMbPerSec; + } + + public void setMergeWriteThroughputMbPerSec(int mergeWriteThroughputMbPerSec) { + this.mergeWriteThroughputMbPerSec = mergeWriteThroughputMbPerSec; + } + + public int getMergeReadThroughputMbPerSec() { + return mergeReadThroughputMbPerSec; } - public void setMergeThroughputMbPerSec(int mergeThroughputMbPerSec) { - this.mergeThroughputMbPerSec = mergeThroughputMbPerSec; + public void setMergeReadThroughputMbPerSec(int mergeReadThroughputMbPerSec) { + this.mergeReadThroughputMbPerSec = mergeReadThroughputMbPerSec; } public long getMemtableSizeThreshold() { diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 0d35a0d..084f412 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -23,7 +23,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; -import java.net.URI; import java.net.URL; import java.time.ZoneId; import java.util.Properties; @@ -369,8 +368,10 @@ public class IoTDBDescriptor { Boolean.toString(conf.isForceFullMerge())))); conf.setChunkMergePointThreshold(Integer.parseInt(properties.getProperty( "chunk_merge_point_threshold", Integer.toString(conf.getChunkMergePointThreshold())))); - conf.setMergeThroughputMbPerSec(Integer.parseInt(properties.getProperty( - "merge_throughput_mb_per_sec", Integer.toString(conf.getMergeThroughputMbPerSec())))); + conf.setMergeWriteThroughputMbPerSec(Integer.parseInt(properties.getProperty( + "merge_write_throughput_mb_per_sec", Integer.toString(conf.getMergeWriteThroughputMbPerSec())))); + conf.setMergeReadThroughputMbPerSec(Integer.parseInt(properties.getProperty( + "merge_read_throughput_mb_per_sec", Integer.toString(conf.getMergeReadThroughputMbPerSec())))); conf.setEnablePartialInsert( Boolean.parseBoolean(properties.getProperty("enable_partial_insert", diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java index d8bcfb5..3ba8bed 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java @@ -60,7 +60,8 @@ public class MergeManager implements IService, MergeManagerMBean { private final String mbeanName = String .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, getID().getJmxName()); - private final RateLimiter mergeRateLimiter = RateLimiter.create(Double.MAX_VALUE); + private final RateLimiter mergeWriteRateLimiter = RateLimiter.create(Double.MAX_VALUE); + private final RateLimiter mergeReadRateLimiter = RateLimiter.create(Double.MAX_VALUE); private AtomicInteger threadCnt = new AtomicInteger(); private ThreadPoolExecutor mergeTaskPool; @@ -74,13 +75,18 @@ public class MergeManager implements IService, MergeManagerMBean { private MergeManager() { } - public RateLimiter getMergeRateLimiter() { - setMergeRate(IoTDBDescriptor.getInstance().getConfig().getMergeThroughputMbPerSec()); - return mergeRateLimiter; + public RateLimiter getMergeWriteRateLimiter() { + setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getMergeWriteThroughputMbPerSec()); + return mergeWriteRateLimiter; + } + + public RateLimiter getMergeReadRateLimiter() { + setReadMergeRate(IoTDBDescriptor.getInstance().getConfig().getMergeReadThroughputMbPerSec()); + return mergeReadRateLimiter; } /** - * wait by throughoutMbPerSec limit to avoid continuous Write + * wait by throughoutMbPerSec limit to avoid continuous Write Or Read */ public static void mergeRateLimiterAcquire(RateLimiter limiter, long bytesLength) { while (bytesLength >= Integer.MAX_VALUE) { @@ -92,14 +98,25 @@ public class MergeManager implements IService, MergeManagerMBean { } } - private void setMergeRate(final double throughoutMbPerSec) { + private void setWriteMergeRate(final double throughoutMbPerSec) { + double throughout = throughoutMbPerSec * 1024.0 * 1024.0; + // if throughout = 0, disable rate limiting + if (throughout == 0) { + throughout = Double.MAX_VALUE; + } + if (mergeWriteRateLimiter.getRate() != throughout) { + mergeWriteRateLimiter.setRate(throughout); + } + } + + private void setReadMergeRate(final double throughoutMbPerSec) { double throughout = throughoutMbPerSec * 1024.0 * 1024.0; // if throughout = 0, disable rate limiting if (throughout == 0) { throughout = Double.MAX_VALUE; } - if (mergeRateLimiter.getRate() != throughout) { - mergeRateLimiter.setRate(throughout); + if (mergeReadRateLimiter.getRate() != throughout) { + mergeReadRateLimiter.setRate(throughout); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java index 411b754..d7dcd4c 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java @@ -62,7 +62,7 @@ public class HotCompactionUtils { throw new IllegalStateException("Utility class"); } - private static Pair<ChunkMetadata, Chunk> readByAppendMerge( + private static Pair<ChunkMetadata, Chunk> readByAppendMerge(RateLimiter compactionReadRateLimiter, Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap) throws IOException { ChunkMetadata newChunkMetadata = null; Chunk newChunk = null; @@ -70,6 +70,8 @@ public class HotCompactionUtils { .entrySet()) { for (ChunkMetadata chunkMetadata : entry.getValue()) { Chunk chunk = entry.getKey().readMemChunk(chunkMetadata); + MergeManager + .mergeRateLimiterAcquire(compactionReadRateLimiter, chunk.getData().position()); if (newChunkMetadata == null) { newChunkMetadata = chunkMetadata; newChunk = chunk; @@ -82,7 +84,7 @@ public class HotCompactionUtils { return new Pair<>(newChunkMetadata, newChunk); } - private static long readByDeserializeMerge( + private static long readByDeserializeMerge(RateLimiter compactionReadRateLimiter, Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap, long maxVersion, Map<Long, TimeValuePair> timeValuePairMap) throws IOException { @@ -94,30 +96,35 @@ public class HotCompactionUtils { maxVersion = Math.max(chunkMetadata.getVersion(), maxVersion); IChunkReader chunkReader = new ChunkReaderByTimestamp( reader.readMemChunk(chunkMetadata)); + long chunkSize = 0; while (chunkReader.hasNextSatisfiedPage()) { IPointReader iPointReader = new BatchDataIterator( chunkReader.nextPageData()); while (iPointReader.hasNextTimeValuePair()) { TimeValuePair timeValuePair = iPointReader.nextTimeValuePair(); + chunkSize += timeValuePair.getSize(); timeValuePairMap.put(timeValuePair.getTimestamp(), timeValuePair); } } + MergeManager + .mergeRateLimiterAcquire(compactionReadRateLimiter, chunkSize); } } return maxVersion; } private static long writeByAppendMerge(long maxVersion, String device, - RateLimiter compactionRateLimiter, + RateLimiter compactionWriteRateLimiter, RateLimiter compactionReadRateLimiter, Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadatasMap, TsFileResource targetResource, RestorableTsFileIOWriter writer) throws IOException { - Pair<ChunkMetadata, Chunk> chunkPair = readByAppendMerge(readerChunkMetadatasMap); + Pair<ChunkMetadata, Chunk> chunkPair = readByAppendMerge(compactionReadRateLimiter, + readerChunkMetadatasMap); ChunkMetadata newChunkMetadata = chunkPair.left; Chunk newChunk = chunkPair.right; if (newChunkMetadata != null && newChunk != null) { maxVersion = Math.max(newChunkMetadata.getVersion(), maxVersion); // wait for limit write - MergeManager.mergeRateLimiterAcquire(compactionRateLimiter, + MergeManager.mergeRateLimiterAcquire(compactionWriteRateLimiter, newChunk.getHeader().getDataSize() + newChunk.getData().position()); writer.writeChunk(newChunk, newChunkMetadata); targetResource.updateStartTime(device, newChunkMetadata.getStartTime()); @@ -127,11 +134,12 @@ public class HotCompactionUtils { } private static long writeByDeserializeMerge(long maxVersion, String device, - RateLimiter compactionRateLimiter, + RateLimiter compactionRateLimiter, RateLimiter compactionReadRateLimiter, Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry, TsFileResource targetResource, RestorableTsFileIOWriter writer) throws IOException { Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>(); - maxVersion = readByDeserializeMerge(entry.getValue(), maxVersion, timeValuePairMap); + maxVersion = readByDeserializeMerge(compactionReadRateLimiter, entry.getValue(), maxVersion, + timeValuePairMap); Iterator<List<ChunkMetadata>> chunkMetadataListIterator = entry.getValue().values() .iterator(); if (!chunkMetadataListIterator.hasNext()) { @@ -185,7 +193,8 @@ public class HotCompactionUtils { Set<String> devices, boolean sequence) throws IOException { RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile()); Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>(); - RateLimiter compactionRateLimiter = MergeManager.getINSTANCE().getMergeRateLimiter(); + RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getMergeWriteRateLimiter(); + RateLimiter compactionReadRateLimiter = MergeManager.getINSTANCE().getMergeReadRateLimiter(); Set<String> tsFileDevicesMap = getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap, storageGroup); for (String device : tsFileDevicesMap) { @@ -200,8 +209,10 @@ public class HotCompactionUtils { tsFileSequenceReaderMap, storageGroup); Map<String, List<ChunkMetadata>> chunkMetadataMap = reader .readChunkMetadataInDevice(device); + long chunkMetadataSize = 0; for (Entry<String, List<ChunkMetadata>> entry : chunkMetadataMap.entrySet()) { for (ChunkMetadata chunkMetadata : entry.getValue()) { + chunkMetadataSize += chunkMetadata.getStatistics().calculateRamSize(); Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap; String measurementUid = chunkMetadata.getMeasurementUid(); if (measurementChunkMetadataMap.containsKey(measurementUid)) { @@ -221,12 +232,17 @@ public class HotCompactionUtils { .put(chunkMetadata.getMeasurementUid(), readerChunkMetadataMap); } } + // wait for limit read + MergeManager + .mergeRateLimiterAcquire(compactionReadRateLimiter, chunkMetadataSize); } if (!sequence) { long maxVersion = Long.MIN_VALUE; for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry : measurementChunkMetadataMap .entrySet()) { - maxVersion = writeByDeserializeMerge(maxVersion, device, compactionRateLimiter, entry, + maxVersion = writeByDeserializeMerge(maxVersion, device, compactionWriteRateLimiter, + compactionReadRateLimiter, + entry, targetResource, writer); } writer.endChunkGroup(); @@ -250,11 +266,15 @@ public class HotCompactionUtils { } if (isPageEnoughLarge) { logger.info("{} [Hot Compaction] page enough large, use append merge", storageGroup); - maxVersion = writeByAppendMerge(maxVersion, device, compactionRateLimiter, + maxVersion = writeByAppendMerge(maxVersion, device, compactionWriteRateLimiter, + compactionReadRateLimiter, readerChunkMetadatasMap, targetResource, writer); } else { - logger.info("{} [Hot Compaction] page enough large, use deserialize merge", storageGroup); - maxVersion = writeByDeserializeMerge(maxVersion, device, compactionRateLimiter, entry, + logger + .info("{} [Hot Compaction] page enough large, use deserialize merge", storageGroup); + maxVersion = writeByDeserializeMerge(maxVersion, device, compactionWriteRateLimiter, + compactionReadRateLimiter, + entry, targetResource, writer); } } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java index d03abc0..ecbeb63 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java @@ -33,7 +33,7 @@ public class MergeManagerTest extends MergeTest { @Test public void testRateLimiter() { - RateLimiter compactionRateLimiter = MergeManager.getINSTANCE().getMergeRateLimiter(); + RateLimiter compactionRateLimiter = MergeManager.getINSTANCE().getMergeWriteRateLimiter(); long startTime = System.currentTimeMillis(); MergeManager.mergeRateLimiterAcquire(compactionRateLimiter, 160 * 1024 * 1024L); assertTrue((System.currentTimeMillis() - startTime) <= 1000); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java index da68c93..45a45b4 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java @@ -260,6 +260,9 @@ public class ChunkMetadata implements Accountable { this.ramSize = size; } + /** + * must use calculate ram size first + */ @Override public long getRamSize() { return ramSize;
