This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch active_flush in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 14d7cd90baf31e7c6b51f8e304ae26aea7ab83d9 Author: qiaojialin <[email protected]> AuthorDate: Wed May 27 19:53:56 2020 +0800 add avg_series_point_number_threshold in config --- docs/UserGuide/Server/Config Manual.md | 36 ++++++++++++++++++++++ docs/zh/UserGuide/Server/Config Manual.md | 36 ++++++++++++++++++++++ .../resources/conf/iotdb-engine.properties | 2 ++ .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 ++++++++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 +++ .../iotdb/db/engine/memtable/AbstractMemTable.java | 17 ++++++++++ .../apache/iotdb/db/engine/memtable/IMemTable.java | 5 +++ .../db/engine/storagegroup/TsFileProcessor.java | 9 ++++-- 8 files changed, 119 insertions(+), 3 deletions(-) diff --git a/docs/UserGuide/Server/Config Manual.md b/docs/UserGuide/Server/Config Manual.md index 86f8165..5b84e06 100644 --- a/docs/UserGuide/Server/Config Manual.md +++ b/docs/UserGuide/Server/Config Manual.md @@ -272,6 +272,42 @@ The permission definitions are in ${IOTDB\_CONF}/conf/jmx.access. |Default| true | |Effective|Trigger| +* enable\_parameter\_adapter + +|Name| enable\_parameter\_adapter | +|:---:|:---| +|Description| enable dynamically adjusting system to avoid OOM| +|Type|Bool| +|Default| true | +|Effective|After restart system| + +* memtable\_size\_threshold + +|Name| memtable\_size\_threshold | +|:---:|:---| +|Description| max memtable size| +|Type|Long| +|Default| false | +|Effective| when enable\_parameter\_adapter is false & After restart system| + +* avg\_series\_point\_number\_threshold + +|Name| avg\_series\_point\_number\_threshold | +|:---:|:---| +|Description| max average number of point of each series in memtable| +|Type|Int32| +|Default| 2048 | +|Effective|After restart system| + +* tsfile\_size\_threshold + +|Name| tsfile\_size\_threshold | +|:---:|:---| +|Description| max tsfile size| +|Type|Long| +|Default| false | +|Effective| when enable\_parameter\_adapter is false & After restart system| + * enable\_partition |Name| enable\_partition | diff --git a/docs/zh/UserGuide/Server/Config Manual.md b/docs/zh/UserGuide/Server/Config Manual.md index d75c978..549db0a 100644 --- a/docs/zh/UserGuide/Server/Config Manual.md +++ b/docs/zh/UserGuide/Server/Config Manual.md @@ -253,6 +253,42 @@ |默认值| 0 | |改后生效方式|重启服务器生效| +* enable\_parameter\_adapter + +|Name| enable\_parameter\_adapter | +|:---:|:---| +|Description| 开启自动调整系统参数,避免爆内存| +|Type|Bool| +|Default| true | +|Effective|重启服务器生效| + +* memtable\_size\_threshold + +|Name| memtable\_size\_threshold | +|:---:|:---| +|Description| 内存缓冲区 memtable 阈值| +|Type|Long| +|Default| false | +|Effective|enable\_parameter\_adapter为false时生效、重启服务器生效| + +* avg\_series\_point\_number\_threshold + +|Name| avg\_series\_point\_number\_threshold | +|:---:|:---| +|Description| 内存中平均每个时间序列点数最大值,达到触发flush| +|Type|Int32| +|Default| 2048 | +|Effective|重启服务器生效| + +* tsfile\_size\_threshold + +|Name| tsfile\_size\_threshold | +|:---:|:---| +|Description| 每个 tsfile 大小| +|Type|Long| +|Default| false | +|Effective|enable\_parameter\_adapter为false时生效、重启服务器生效| + * enable\_partition |Name| enable\_partition | diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index e892740..15c4d8a 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -181,6 +181,8 @@ tsfile_size_threshold=536870912 # When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 1 GB. memtable_size_threshold=1073741824 +avg_series_point_number_threshold=2048 + # How many threads can concurrently flush. When <= 0, use CPU core number. concurrent_flush_thread=0 diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 64d87fa..3d6ec9a 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -230,6 +230,11 @@ public class IoTDBConfig { private long memtableSizeThreshold = 128 * 1024 * 1024L; /** + * When average series point number reaches this, flush the memtable to disk + */ + private float avgSeriesPointNumberThreshold = 2048; + + /** * whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */ private boolean metaDataCacheEnable = true; @@ -1106,6 +1111,14 @@ public class IoTDBConfig { this.memtableSizeThreshold = memtableSizeThreshold; } + public float getAvgSeriesPointNumberThreshold() { + return avgSeriesPointNumberThreshold; + } + + public void setAvgSeriesPointNumberThreshold(float avgSeriesPointNumberThreshold) { + this.avgSeriesPointNumberThreshold = avgSeriesPointNumberThreshold; + } + public MergeFileStrategy getMergeFileStrategy() { return mergeFileStrategy; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 40599ff..7699bba 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -237,6 +237,10 @@ public class IoTDBDescriptor { conf.setMemtableSizeThreshold(memTableSizeThreshold); } + conf.setAvgSeriesPointNumberThreshold(Float.parseFloat(properties + .getProperty("avg_series_point_number_threshold", + Float.toString(conf.getAvgSeriesPointNumberThreshold())))); + conf.setSyncEnable(Boolean .parseBoolean(properties.getProperty("is_sync_enable", Boolean.toString(conf.isSyncEnable())))); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index 5292938..4d406fe 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -49,6 +49,10 @@ public abstract class AbstractMemTable implements IMemTable { private long memSize = 0; + private float seriesNumber = 0; + + private float averagePointNumber = 0; + public AbstractMemTable() { this.memTableMap = new HashMap<>(); } @@ -79,6 +83,7 @@ public abstract class AbstractMemTable implements IMemTable { Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId); if (!memSeries.containsKey(measurement)) { memSeries.put(measurement, genMemSeries(schema)); + seriesNumber++; } return memSeries.get(measurement); } @@ -95,6 +100,8 @@ public abstract class AbstractMemTable implements IMemTable { memSize += MemUtils.getRecordSize(insertPlan.getSchemas()[i].getType(), value); + averagePointNumber += insertPlan.getPaths().size() / seriesNumber; + write(insertPlan.getDeviceId(), insertPlan.getMeasurements()[i], insertPlan.getSchemas()[i], insertPlan.getTime(), value); } @@ -110,6 +117,9 @@ public abstract class AbstractMemTable implements IMemTable { write(insertTabletPlan, start, end); long recordSizeInByte = MemUtils.getRecordSize(insertTabletPlan, start, end); memSize += recordSizeInByte; + averagePointNumber += + (float) (insertTabletPlan.getMeasurements().length * insertTabletPlan.getRowCount()) + / seriesNumber; } catch (RuntimeException e) { throw new WriteProcessException(e.getMessage()); } @@ -151,6 +161,11 @@ public abstract class AbstractMemTable implements IMemTable { } @Override + public float getAveragePointNumber() { + return averagePointNumber; + } + + @Override public void clear() { memTableMap.clear(); modifications.clear(); @@ -219,6 +234,8 @@ public abstract class AbstractMemTable implements IMemTable { @Override public void release() { + seriesNumber = 0; + averagePointNumber = 0; for (Entry<String, Map<String, IWritableMemChunk>> entry : memTableMap.entrySet()) { for (Entry<String, IWritableMemChunk> subEntry : entry.getValue().entrySet()) { TVListAllocator.getInstance().release(subEntry.getValue().getTVList()); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index c84d497..ed7fa15 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -56,6 +56,11 @@ public interface IMemTable { */ long memSize(); + /** + * @return average number of points in each WritableChunk + */ + float getAveragePointNumber(); + void insert(InsertPlan insertPlan) throws WriteProcessException; /** diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index b0f308e..2c8dd8a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -70,6 +70,9 @@ public class TsFileProcessor { private static final Logger logger = LoggerFactory.getLogger(TsFileProcessor.class); private final String storageGroupName; + + private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + /** * sync this object in query() and asyncTryToFlush() */ @@ -260,8 +263,9 @@ public class TsFileProcessor { boolean shouldFlush() { - return workMemTable != null - && workMemTable.memSize() > getMemtableSizeThresholdBasedOnSeriesNum(); + return workMemTable != null && + (workMemTable.memSize() >= getMemtableSizeThresholdBasedOnSeriesNum() + || workMemTable.getAveragePointNumber() >= config.getAvgSeriesPointNumberThreshold()); } /** @@ -272,7 +276,6 @@ public class TsFileProcessor { * size. We need to adjust it according to the number of timeseries in a specific storage group. */ private long getMemtableSizeThresholdBasedOnSeriesNum() { - IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); if(!config.isEnableParameterAdapter()){ return config.getMemtableSizeThreshold(); }
