This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch fix_oom in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit fadc454475973bed14d4ac991ed6c1086a08ee30 Author: lta <[email protected]> AuthorDate: Mon Apr 20 21:29:52 2020 +0800 modify new memtable size estimation and limit the total memoty size of write log to up to allocate for write size / 10 --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../db/conf/adapter/IoTDBConfigDynamicAdapter.java | 11 ++++-- .../iotdb/db/engine/memtable/AbstractMemTable.java | 4 +-- .../engine/storagegroup/StorageGroupProcessor.java | 1 + .../db/engine/storagegroup/TsFileProcessor.java | 2 ++ .../java/org/apache/iotdb/db/utils/MemUtils.java | 42 +++++++++++----------- .../db/writelog/node/ExclusiveWriteLogNode.java | 8 +++-- 7 files changed, 39 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index a1dee4d..31984d1 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -943,7 +943,7 @@ public class IoTDBConfig { return walBufferSize; } - void setWalBufferSize(int walBufferSize) { + public void setWalBufferSize(int walBufferSize) { this.walBufferSize = walBufferSize; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapter.java b/server/src/main/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapter.java index 2069043..110e477 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapter.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapter.java @@ -150,6 +150,8 @@ public class IoTDBConfigDynamicAdapter implements IDynamicAdapter { if (canAdjust) { CONFIG.setMaxMemtableNumber(maxMemTableNum); + CONFIG.setWalBufferSize( + (int) Math.min(Integer.MAX_VALUE, allocateMemoryForWrite / 10 / maxMemTableNum)); CONFIG.setTsFileSizeThreshold(tsFileSizeThreshold); CONFIG.setMemtableSizeThreshold(memtableSizeInByte); if (LOGGER.isDebugEnabled() && initialized) { @@ -217,14 +219,16 @@ public class IoTDBConfigDynamicAdapter implements IDynamicAdapter { @Override public void addOrDeleteStorageGroup(int diff) throws ConfigAdjusterException { totalStorageGroup += diff; - maxMemTableNum += IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() * diff; - if(!CONFIG.isEnableParameterAdapter()){ + maxMemTableNum += + IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() * diff; + if (!CONFIG.isEnableParameterAdapter()) { CONFIG.setMaxMemtableNumber(maxMemTableNum); return; } if (!tryToAdaptParameters()) { totalStorageGroup -= diff; - maxMemTableNum -= IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() * diff; + maxMemTableNum -= + IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() * diff; throw new ConfigAdjusterException(CREATE_STORAGE_GROUP); } } @@ -286,4 +290,5 @@ public class IoTDBConfigDynamicAdapter implements IDynamicAdapter { } } + } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index f9de3c7..7297c61 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -93,11 +93,11 @@ public abstract class AbstractMemTable implements IMemTable { Object value = CommonUtils.parseValue(insertPlan.getSchemas()[i].getType(), insertPlan.getValues()[i]); + memSize += MemUtils.getRecordSize(insertPlan.getSchemas()[i].getType(), value); + write(insertPlan.getDeviceId(), insertPlan.getMeasurements()[i], insertPlan.getSchemas()[i], insertPlan.getTime(), value); } - long recordSizeInByte = MemUtils.getRecordSize(insertPlan); - memSize += recordSizeInByte; } catch (QueryProcessException e) { throw new WriteProcessException(e.getMessage()); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 273148d..d97fdcd 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -44,6 +44,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.DirectoryManager; import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.cache.RamUsageEstimator; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy; import org.apache.iotdb.db.engine.merge.manage.MergeManager; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 7e554cf..a231c36 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -33,6 +33,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter; import org.apache.iotdb.db.conf.adapter.CompressionRatio; import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter; +import org.apache.iotdb.db.engine.cache.RamUsageEstimator; import org.apache.iotdb.db.engine.flush.FlushManager; import org.apache.iotdb.db.engine.flush.MemTableFlushTask; import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable; @@ -88,6 +89,7 @@ public class TsFileProcessor { */ private volatile boolean shouldClose; private IMemTable workMemTable; + private VersionController versionController; /** * this callback is called after the corresponding TsFile is called endFile(). diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java index 5229c48..edeb1f1 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java @@ -19,8 +19,10 @@ package org.apache.iotdb.db.utils; import org.apache.iotdb.db.conf.IoTDBConstant; +import org.apache.iotdb.db.engine.cache.RamUsageEstimator; import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.BooleanDataPoint; @@ -43,29 +45,25 @@ public class MemUtils { } /** - * function for getting the record size. + * function for getting the value size. */ - public static long getRecordSize(InsertPlan insertPlan) { - long memSize = 0; - for (int i = 0; i < insertPlan.getValues().length; i++) { - switch (insertPlan.getSchemas()[i].getType()) { - case INT32: - memSize += 8L + 4L; break; - case INT64: - memSize += 8L + 8L; break; - case FLOAT: - memSize += 8L + 4L; break; - case DOUBLE: - memSize += 8L + 8L; break; - case BOOLEAN: - memSize += 8L + 1L; break; - case TEXT: - memSize += 8L + insertPlan.getValues()[i].length(); break; - default: - memSize += 8L + 8L; - } + public static long getRecordSize(TSDataType dataType, Object value) { + switch (dataType) { + case INT32: + return 8L + 4L; + case INT64: + return 8L + 8L; + case FLOAT: + return 8L + 4L; + case DOUBLE: + return 8L + 8L; + case BOOLEAN: + return 8L + 1L; + case TEXT: + return 8L + RamUsageEstimator.sizeOf(value); + default: + return 8L + 8L; } - return memSize; } public static long getRecordSize(BatchInsertPlan batchInsertPlan, int start, int end) { @@ -88,7 +86,7 @@ public class MemUtils { case TEXT: memSize += (end - start) * 8L; for (int j = start; j < end; j++) { - memSize += ((Binary[]) batchInsertPlan.getColumns()[i])[j].getLength(); + memSize += RamUsageEstimator.sizeOf(((Binary[]) batchInsertPlan.getColumns()[i])[j]); } break; default: diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java index b9b5e0a..dda1a7e 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java @@ -46,7 +46,6 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive public static final String WAL_FILE_NAME = "wal"; private static final Logger logger = LoggerFactory.getLogger(ExclusiveWriteLogNode.class); - private static int logBufferSize = IoTDBDescriptor.getInstance().getConfig().getWalBufferSize(); private String identifier; @@ -56,7 +55,8 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - private ByteBuffer logBuffer = ByteBuffer.allocate(logBufferSize); + private ByteBuffer logBuffer = ByteBuffer + .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()); private ReadWriteLock lock = new ReentrantReadWriteLock(); @@ -88,7 +88,9 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive sync(); } } catch (BufferOverflowException e) { - throw new IOException("Log cannot fit into buffer, please increase wal_buffer_size", e); + throw new IOException( + "Log cannot fit into buffer, if you don't enable Dynamic Parameter Adapter, please increase wal_buffer_size;" + + "otherwise, please increase the JVM memory", e); } finally { lock.writeLock().unlock(); }
