This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch WalBufferPoolConfig in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e2f1fcc9ab2c0e84d9038cb526a59cc44be4d706 Author: JackieTien97 <[email protected]> AuthorDate: Thu Feb 25 11:35:40 2021 +0800 change the confid --- .../resources/conf/iotdb-engine.properties | 9 +++ .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 32 +++++++-- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 28 ++++++-- .../engine/storagegroup/StorageGroupProcessor.java | 82 +++++++++++----------- 4 files changed, 95 insertions(+), 56 deletions(-) diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index 81e209e..6af92de 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -414,6 +414,15 @@ enable_stat_monitor=false enable_monitor_series_write=false #################### +### WAL Direct Buffer Pool Configuration +#################### +# the interval to trim the wal pool +wal_pool_trim_interval_ms=10000 + +# the max number of wal bytebuffer can be allocated for each time partition, if there is no unseq data you can set it to 4. +max_wal_bytebuffer_num_for_each_partition=8 + +#################### ### External sort Configuration #################### # Is external sort enable diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 0c98992..1e5ccfd 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -18,6 +18,11 @@ */ package org.apache.iotdb.db.conf; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; + +import java.io.File; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.iotdb.db.conf.directories.DirectoryManager; import org.apache.iotdb.db.engine.compaction.CompactionStrategy; import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy; @@ -31,16 +36,9 @@ import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.fileSystem.FSType; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; - public class IoTDBConfig { /* Names of Watermark methods */ @@ -167,6 +165,10 @@ public class IoTDBConfig { */ private int walBufferSize = 16 * 1024 * 1024; + private int maxWalBytebufferNumForEachPartition = 8; + + private long walPoolTrimIntervalInMS = 10_000; + private int estimatedSeriesSize = 300; /** @@ -1144,6 +1146,22 @@ public class IoTDBConfig { this.walBufferSize = walBufferSize; } + public int getMaxWalBytebufferNumForEachPartition() { + return maxWalBytebufferNumForEachPartition; + } + + public void setMaxWalBytebufferNumForEachPartition(int maxWalBytebufferNumForEachPartition) { + this.maxWalBytebufferNumForEachPartition = maxWalBytebufferNumForEachPartition; + } + + public long getWalPoolTrimIntervalInMS() { + return walPoolTrimIntervalInMS; + } + + public void setWalPoolTrimIntervalInMS(long walPoolTrimIntervalInMS) { + this.walPoolTrimIntervalInMS = walPoolTrimIntervalInMS; + } + public int getEstimatedSeriesSize() { return estimatedSeriesSize; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index f26a1b5..dfadc8c 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -260,13 +260,6 @@ public class IoTDBDescriptor { conf.setWalDir(properties.getProperty("wal_dir", conf.getWalDir())); - int walBufferSize = - Integer.parseInt( - properties.getProperty("wal_buffer_size", Integer.toString(conf.getWalBufferSize()))); - if (walBufferSize > 0) { - conf.setWalBufferSize(walBufferSize); - } - int mlogBufferSize = Integer.parseInt( properties.getProperty( @@ -819,6 +812,27 @@ public class IoTDBDescriptor { properties.getProperty( "enable_discard_out_of_order_data", Boolean.toString(conf.isEnableDiscardOutOfOrderData())))); + + int walBufferSize = + Integer.parseInt( + properties.getProperty("wal_buffer_size", Integer.toString(conf.getWalBufferSize()))); + if (walBufferSize > 0) { + conf.setWalBufferSize(walBufferSize); + } + + int maxWalBytebufferNumForEachPartition = + Integer.parseInt( + properties.getProperty("max_wal_bytebuffer_num_for_each_partition", Integer.toString(conf.getMaxWalBytebufferNumForEachPartition()))); + if (maxWalBytebufferNumForEachPartition > 0) { + conf.setMaxWalBytebufferNumForEachPartition(maxWalBytebufferNumForEachPartition); + } + + long poolTrimIntervalInMS = + Integer.parseInt( + properties.getProperty("wal_pool_trim_interval_ms", Long.toString(conf.getWalPoolTrimIntervalInMS()))); + if (poolTrimIntervalInMS > 0) { + conf.setWalPoolTrimIntervalInMS(poolTrimIntervalInMS); + } } private void loadAutoCreateSchemaProps(Properties properties) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 61b692c..edd1c42 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -18,6 +18,38 @@ */ package org.apache.iotdb.db.engine.storagegroup; +import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR; +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; +import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Deque; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -77,43 +109,9 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; - -import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.Deque; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR; -import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; -import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; - /** * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one * TsFileProcessor in the working status. <br> @@ -254,11 +252,6 @@ public class StorageGroupProcessor { private static final int WAL_BUFFER_SIZE = IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2; - private static final int MAX_WAL_BYTEBUFFER_NUM = - IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition() * 4; - - private static final long DEFAULT_POOL_TRIM_INTERVAL_MILLIS = 10_000; - private final Deque<ByteBuffer> walByteBufferPool = new LinkedList<>(); private int currentWalPoolSize = 0; @@ -277,6 +270,9 @@ public class StorageGroupProcessor { public ByteBuffer[] getWalDirectByteBuffer() { ByteBuffer[] res = new ByteBuffer[2]; synchronized (walByteBufferPool) { + long startTime = System.nanoTime(); + int MAX_WAL_BYTEBUFFER_NUM = + config.getConcurrentWritingTimePartition() * config.getMaxWalBytebufferNumForEachPartition(); while (walByteBufferPool.isEmpty() && currentWalPoolSize + 2 > MAX_WAL_BYTEBUFFER_NUM) { try { walByteBufferPool.wait(); @@ -288,6 +284,8 @@ public class StorageGroupProcessor { virtualStorageGroupId, e); } + logger.info("Waiting {} ms for wal direct byte buffer.", + (System.nanoTime() - startTime) / 1_000_000); } // If the queue is not empty, it must have at least two. if (!walByteBufferPool.isEmpty()) { @@ -337,7 +335,7 @@ public class StorageGroupProcessor { // we will trim the size to expectedSize until the pool is empty while (expectedSize < currentWalPoolSize && !walByteBufferPool.isEmpty() - && poolNotEmptyIntervalInMS >= DEFAULT_POOL_TRIM_INTERVAL_MILLIS) { + && poolNotEmptyIntervalInMS >= config.getWalPoolTrimIntervalInMS()) { MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast()); MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast()); currentWalPoolSize -= 2; @@ -380,8 +378,8 @@ public class StorageGroupProcessor { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); executorService.scheduleWithFixedDelay( this::trimTask, - DEFAULT_POOL_TRIM_INTERVAL_MILLIS, - DEFAULT_POOL_TRIM_INTERVAL_MILLIS, + config.getWalPoolTrimIntervalInMS(), + config.getWalPoolTrimIntervalInMS(), TimeUnit.MILLISECONDS); recover(); }
