This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9337d450c0e Delete write mem control parameters (#12007)
9337d450c0e is described below
commit 9337d450c0ef3acce44c0a9914078decac699698
Author: YuFengLiu <[email protected]>
AuthorDate: Thu Feb 1 10:11:37 2024 +0800
Delete write mem control parameters (#12007)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 48 -------
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 40 ------
.../db/storageengine/dataregion/DataRegion.java | 47 +++---
.../impl/ReadChunkCompactionPerformer.java | 1 -
.../execute/task/InnerSpaceCompactionTask.java | 11 +-
.../task/InsertionCrossSpaceCompactionTask.java | 12 +-
.../writer/AbstractCrossCompactionWriter.java | 9 +-
.../writer/AbstractInnerCompactionWriter.java | 8 --
.../compaction/io/CompactionTsFileWriter.java | 5 +-
.../compaction/schedule/CompactionWorker.java | 22 ++-
.../estimator/AbstractCrossSpaceEstimator.java | 3 -
.../estimator/AbstractInnerSpaceEstimator.java | 6 +-
.../dataregion/flush/MemTableFlushTask.java | 14 +-
.../dataregion/memtable/AbstractMemTable.java | 14 +-
.../memtable/AlignedWritableMemChunk.java | 6 +-
.../dataregion/memtable/PrimitiveMemTable.java | 5 -
.../dataregion/memtable/TsFileProcessor.java | 159 +++++++--------------
.../dataregion/memtable/WritableMemChunk.java | 2 +-
.../rescon/memory/MemTableManager.java | 60 +-------
.../db/storageengine/rescon/memory/SystemInfo.java | 28 +---
.../java/org/apache/iotdb/db/utils/MemUtils.java | 68 +++------
.../db/utils/datastructure/AlignedTVList.java | 4 +-
.../iotdb/db/utils/datastructure/BinaryTVList.java | 2 +-
.../iotdb/db/utils/datastructure/TVList.java | 8 +-
.../compaction/CompactionSchedulerTest.java | 1 -
.../compaction/utils/CompactionConfigRestorer.java | 4 -
.../apache/iotdb/db/utils/EnvironmentUtils.java | 4 -
.../org/apache/iotdb/db/utils/MemUtilsTest.java | 87 ++++++++++-
.../resources/conf/iotdb-common.properties | 15 --
.../write/writer/RestorableTsFileIOWriter.java | 1 -
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 52 +------
.../writer/TsFileIOWriterMemoryControlTest.java | 34 ++---
32 files changed, 247 insertions(+), 533 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 26659d87f80..850786efcca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -359,9 +359,6 @@ public class IoTDBConfig {
*/
private int maxPendingWindowEvaluationTasks = 64;
- /** Is the write mem control for writing enable. */
- private boolean enableMemControl = true;
-
/** Is the write ahead log enable. */
private boolean enableIndex = false;
@@ -383,9 +380,6 @@ public class IoTDBConfig {
/** When a sequence TsFile's file size (in byte) exceed this, the TsFile is
forced closed. */
private long seqTsFileSize = 0L;
- /** When a memTable's size (in byte) exceeds this, the memtable is flushed
to disk. Unit: byte */
- private long memtableSizeThreshold = 1024 * 1024 * 1024L;
-
/** Whether to timed flush sequence tsfiles' memtables. */
private boolean enableTimedFlushSeqMemtable = true;
@@ -460,13 +454,6 @@ public class IoTDBConfig {
*/
private CompactionPriority compactionPriority = CompactionPriority.BALANCE;
- /**
- * Enable compaction memory control or not. If true and estimated memory
size of one compaction
- * task exceeds the threshold, system will block the compaction. It only
works for cross space
- * compaction currently.
- */
- private boolean enableCompactionMemControl = true;
-
private double chunkMetadataSizeProportion = 0.1;
/** The target tsfile size in compaction, 2 GB by default */
@@ -758,9 +745,6 @@ public class IoTDBConfig {
/** kerberos principal */
private String kerberosPrincipal = "your principal";
- /** the num of memtable in each database */
- private int concurrentWritingTimePartition = 1;
-
/** the default fill interval in LinearFill and PreviousFill, -1 means
infinite past time */
private int defaultFillInterval = -1;
@@ -1207,14 +1191,6 @@ public class IoTDBConfig {
this.udfInitialByteArrayLengthForMemoryControl =
udfInitialByteArrayLengthForMemoryControl;
}
- public int getConcurrentWritingTimePartition() {
- return concurrentWritingTimePartition;
- }
-
- public void setConcurrentWritingTimePartition(int
concurrentWritingTimePartition) {
- this.concurrentWritingTimePartition = concurrentWritingTimePartition;
- }
-
public int getDefaultFillInterval() {
return defaultFillInterval;
}
@@ -2028,22 +2004,6 @@ public class IoTDBConfig {
this.compactionWriteThroughputMbPerSec = compactionWriteThroughputMbPerSec;
}
- public boolean isEnableMemControl() {
- return enableMemControl;
- }
-
- public void setEnableMemControl(boolean enableMemControl) {
- this.enableMemControl = enableMemControl;
- }
-
- public long getMemtableSizeThreshold() {
- return memtableSizeThreshold;
- }
-
- public void setMemtableSizeThreshold(long memtableSizeThreshold) {
- this.memtableSizeThreshold = memtableSizeThreshold;
- }
-
public boolean isEnableTimedFlushSeqMemtable() {
return enableTimedFlushSeqMemtable;
}
@@ -2752,14 +2712,6 @@ public class IoTDBConfig {
this.compactionPriority = compactionPriority;
}
- public boolean isEnableCompactionMemControl() {
- return enableCompactionMemControl;
- }
-
- public void setEnableCompactionMemControl(boolean
enableCompactionMemControl) {
- this.enableCompactionMemControl = enableCompactionMemControl;
- }
-
public long getTargetCompactionFileSize() {
return targetCompactionFileSize;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 39fb8f51a29..0188501b1cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -365,22 +365,6 @@ public class IoTDBDescriptor {
Integer.parseInt(
properties.getProperty("batch_size",
Integer.toString(conf.getBatchSize()))));
- conf.setEnableMemControl(
- (Boolean.parseBoolean(
- properties.getProperty(
- "enable_mem_control",
Boolean.toString(conf.isEnableMemControl())))));
- LOGGER.info("IoTDB enable memory control: {}", conf.isEnableMemControl());
-
- long memTableSizeThreshold =
- Long.parseLong(
- properties
- .getProperty(
- "memtable_size_threshold",
Long.toString(conf.getMemtableSizeThreshold()))
- .trim());
- if (memTableSizeThreshold > 0) {
- conf.setMemtableSizeThreshold(memTableSizeThreshold);
- }
-
conf.setTvListSortAlgorithm(
TVListSortAlgorithm.valueOf(
properties.getProperty(
@@ -475,12 +459,6 @@ public class IoTDBDescriptor {
properties.getProperty(
"compaction_priority",
conf.getCompactionPriority().toString())));
- conf.setEnableCompactionMemControl(
- Boolean.parseBoolean(
- properties.getProperty(
- "enable_compaction_mem_control",
- Boolean.toString(conf.isEnableCompactionMemControl()))));
-
int subtaskNum =
Integer.parseInt(
properties.getProperty(
@@ -748,13 +726,6 @@ public class IoTDBDescriptor {
properties.getProperty(
"device_path_cache_size",
String.valueOf(conf.getDevicePathCacheSize()))));
- // the num of memtables in each database
- conf.setConcurrentWritingTimePartition(
- Integer.parseInt(
- properties.getProperty(
- "concurrent_writing_time_partition",
- String.valueOf(conf.getConcurrentWritingTimePartition()))));
-
// the default fill interval in LinearFill and PreviousFill
conf.setDefaultFillInterval(
Integer.parseInt(
@@ -1556,17 +1527,6 @@ public class IoTDBDescriptor {
// update timed flush & close conf
loadTimedService(properties);
StorageEngine.getInstance().rebootTimedService();
-
- long memTableSizeThreshold =
- Long.parseLong(
- properties
- .getProperty(
- "memtable_size_threshold",
Long.toString(conf.getMemtableSizeThreshold()))
- .trim());
- if (memTableSizeThreshold > 0) {
- conf.setMemtableSizeThreshold(memTableSizeThreshold);
- }
-
// update params of creating schemaengine automatically
loadAutoCreateSchemaProps(properties);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index e51fdba5a4c..632c99ea44a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -194,7 +194,6 @@ public class DataRegion implements IDataRegionForQuery {
private static final Logger logger =
LoggerFactory.getLogger(DataRegion.class);
- private final boolean enableMemControl = config.isEnableMemControl();
/**
* a read write lock for guaranteeing concurrent safety when accessing all
fields in this class
* (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor,
@@ -773,21 +772,19 @@ public class DataRegion implements IDataRegionForQuery {
tsFileResource.removeResourceFile();
tsFileProcessor.setTimeRangeId(timePartitionId);
writer.makeMetadataVisible();
- if (enableMemControl) {
- TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(dataRegionInfo);
- tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
- this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor);
- // get chunkMetadata size
- long chunkMetadataSize = 0;
- for (Map<String, List<ChunkMetadata>> metaMap :
writer.getMetadatasForQuery().values()) {
- for (List<ChunkMetadata> metadatas : metaMap.values()) {
- for (ChunkMetadata chunkMetadata : metadatas) {
- chunkMetadataSize += chunkMetadata.getRetainedSizeInBytes();
- }
+ TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(dataRegionInfo);
+ tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
+ this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor);
+ // get chunkMetadata size
+ long chunkMetadataSize = 0;
+ for (Map<String, List<ChunkMetadata>> metaMap :
writer.getMetadatasForQuery().values()) {
+ for (List<ChunkMetadata> metadatas : metaMap.values()) {
+ for (ChunkMetadata chunkMetadata : metadatas) {
+ chunkMetadataSize += chunkMetadata.getRetainedSizeInBytes();
}
}
- tsFileProcessorInfo.addTSPMemCost(chunkMetadataSize);
}
+ tsFileProcessorInfo.addTSPMemCost(chunkMetadataSize);
}
tsFileManager.add(tsFileResource, recoverPerformer.isSequence());
} catch (Throwable e) {
@@ -872,9 +869,7 @@ public class DataRegion implements IDataRegionForQuery {
throw new OutOfTTLException(
insertRowNode.getTime(), (CommonDateTimeUtils.currentTime() -
dataTTL));
}
- if (enableMemControl) {
- StorageEngine.blockInsertionIfReject(null);
- }
+ StorageEngine.blockInsertionIfReject(null);
long startTime = System.nanoTime();
writeLock("InsertRow");
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() -
startTime);
@@ -928,9 +923,7 @@ public class DataRegion implements IDataRegionForQuery {
@SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
public void insertTablet(InsertTabletNode insertTabletNode)
throws BatchProcessException, WriteProcessException {
- if (enableMemControl) {
- StorageEngine.blockInsertionIfReject(null);
- }
+ StorageEngine.blockInsertionIfReject(null);
long startTime = System.nanoTime();
writeLock("insertTablet");
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() -
startTime);
@@ -1421,11 +1414,9 @@ public class DataRegion implements IDataRegionForQuery {
this::flushCallback,
sequence);
- if (enableMemControl) {
- TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(dataRegionInfo);
- tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
- this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor);
- }
+ TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(dataRegionInfo);
+ tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
+ this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor);
tsFileProcessor.addCloseFileListeners(customCloseFileListeners);
tsFileProcessor.addFlushListeners(customFlushListeners);
@@ -3055,9 +3046,7 @@ public class DataRegion implements IDataRegionForQuery {
*/
public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode)
throws WriteProcessException, BatchProcessException {
- if (enableMemControl) {
- StorageEngine.blockInsertionIfReject(null);
- }
+ StorageEngine.blockInsertionIfReject(null);
long startTime = System.nanoTime();
writeLock("InsertRowsOfOneDevice");
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() -
startTime);
@@ -3131,9 +3120,7 @@ public class DataRegion implements IDataRegionForQuery {
public void insert(InsertRowsNode insertRowsNode)
throws BatchProcessException, WriteProcessRejectException {
- if (enableMemControl) {
- StorageEngine.blockInsertionIfReject(null);
- }
+ StorageEngine.blockInsertionIfReject(null);
long startTime = System.nanoTime();
writeLock("InsertRows");
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() -
startTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index daed9052bbf..cb3ebb47885 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -75,7 +75,6 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
CompactionTsFileWriter writer =
new CompactionTsFileWriter(
targetResource.getTsFile(),
- true,
sizeForFileWriter,
CompactionType.INNER_SEQ_COMPACTION)) {
while (deviceIterator.hasNextDevice()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index 5a5538bd6bc..5219ab6bbf1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
@@ -132,12 +131,10 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
this.selectedTsFileResourceList = selectedTsFileResourceList;
this.sequence = sequence;
this.performer = performer;
- if
(IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMemControl()) {
- if (this.performer instanceof ReadChunkCompactionPerformer) {
- innerSpaceEstimator = new ReadChunkInnerCompactionEstimator();
- } else if (!sequence && this.performer instanceof
FastCompactionPerformer) {
- innerSpaceEstimator = new FastCompactionInnerCompactionEstimator();
- }
+ if (this.performer instanceof ReadChunkCompactionPerformer) {
+ innerSpaceEstimator = new ReadChunkInnerCompactionEstimator();
+ } else if (!sequence && this.performer instanceof FastCompactionPerformer)
{
+ innerSpaceEstimator = new FastCompactionInnerCompactionEstimator();
}
isHoldingWriteLock = new boolean[selectedTsFileResourceList.size()];
for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
index e4c5ac2deac..d5049899bfc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
@@ -283,13 +283,10 @@ public class InsertionCrossSpaceCompactionTask extends
AbstractCompactionTask {
private boolean shouldRollback() {
// if target file or its responding file does not exist, then return true
- if (targetFile == null
+ return targetFile == null
|| !targetFile.tsFileExists()
|| !targetFile.resourceFileExists()
- || (unseqFileToInsert.modFileExists() && !targetFile.modFileExists()))
{
- return true;
- }
- return false;
+ || (unseqFileToInsert.modFileExists() && !targetFile.modFileExists());
}
private void rollback() throws IOException {
@@ -303,7 +300,7 @@ public class InsertionCrossSpaceCompactionTask extends
AbstractCompactionTask {
FileMetrics.getInstance().deleteTsFile(true,
Collections.singletonList(targetFile));
}
// delete target file
- if (targetFile != null && !deleteTsFileOnDisk(targetFile)) {
+ if (!deleteTsFileOnDisk(targetFile)) {
throw new CompactionRecoverException(
String.format("failed to delete target file %s", targetFile));
}
@@ -322,9 +319,6 @@ public class InsertionCrossSpaceCompactionTask extends
AbstractCompactionTask {
@Override
public boolean equalsOtherTask(AbstractCompactionTask otherTask) {
- if (!(otherTask instanceof InsertionCrossSpaceCompactionTask)) {
- return false;
- }
return false;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
index 6bc3a1af406..11306464044 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
@@ -74,12 +74,10 @@ public abstract class AbstractCrossCompactionWriter extends
AbstractCompactionWr
/
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
*
IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion()
/ targetResources.size());
- boolean enableMemoryControl =
IoTDBDescriptor.getInstance().getConfig().isEnableMemControl();
for (int i = 0; i < targetResources.size(); i++) {
this.targetFileWriters.add(
new CompactionTsFileWriter(
targetResources.get(i).getTsFile(),
- enableMemoryControl,
memorySizeForEachWriter,
CompactionType.CROSS_COMPACTION));
isEmptyFile[i] = true;
@@ -94,8 +92,8 @@ public abstract class AbstractCrossCompactionWriter extends
AbstractCompactionWr
this.isAlign = isAlign;
this.seqFileIndexArray = new int[subTaskNum];
checkIsDeviceExistAndGetDeviceEndTime();
- for (int i = 0; i < targetFileWriters.size(); i++) {
- chunkGroupHeaderSize =
targetFileWriters.get(i).startChunkGroup(deviceId);
+ for (CompactionTsFileWriter targetFileWriter : targetFileWriters) {
+ chunkGroupHeaderSize = targetFileWriter.startChunkGroup(deviceId);
}
}
@@ -166,8 +164,7 @@ public abstract class AbstractCrossCompactionWriter extends
AbstractCompactionWr
@Override
public void checkAndMayFlushChunkMetadata() throws IOException {
- for (int i = 0; i < targetFileWriters.size(); i++) {
- CompactionTsFileWriter fileIoWriter = targetFileWriters.get(i);
+ for (CompactionTsFileWriter fileIoWriter : targetFileWriters) {
fileIoWriter.checkMetadataSizeAndMayFlush();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
index d3d8cf00b1b..de191b394c9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
@@ -25,7 +25,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFi
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -38,23 +37,16 @@ public abstract class AbstractInnerCompactionWriter extends
AbstractCompactionWr
protected TsFileResource targetResource;
- protected long targetPageSize =
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
-
- protected long targetPagePointNum =
- TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
-
protected AbstractInnerCompactionWriter(TsFileResource targetFileResource)
throws IOException {
long sizeForFileWriter =
(long)
((double) SystemInfo.getInstance().getMemorySizeForCompaction()
/
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
*
IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion());
- boolean enableMemoryControl =
IoTDBDescriptor.getInstance().getConfig().isEnableMemControl();
this.targetResource = targetFileResource;
this.fileWriter =
new CompactionTsFileWriter(
targetFileResource.getTsFile(),
- enableMemoryControl,
sizeForFileWriter,
targetResource.isSeq()
? CompactionType.INNER_SEQ_COMPACTION
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
index 834814746f8..6783d84bfe5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
@@ -43,10 +43,9 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
private volatile boolean isWritingAligned = false;
private boolean isEmptyTargetFile = true;
- public CompactionTsFileWriter(
- File file, boolean enableMemoryControl, long maxMetadataSize,
CompactionType type)
+ public CompactionTsFileWriter(File file, long maxMetadataSize,
CompactionType type)
throws IOException {
- super(file, enableMemoryControl, maxMetadataSize);
+ super(file, maxMetadataSize);
this.type = type;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
index 677ba75b647..4193e6697fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.schedule;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException;
@@ -85,19 +84,16 @@ public class CompactionWorker implements Runnable {
return false;
}
task.transitSourceFilesToMerging();
- if
(IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMemControl()) {
- estimatedMemoryCost = task.getEstimatedMemoryCost();
- if (estimatedMemoryCost < 0) {
- return false;
- }
- CompactionTaskType taskType = task.getCompactionTaskType();
- memoryAcquired =
- SystemInfo.getInstance().addCompactionMemoryCost(taskType,
estimatedMemoryCost, 60);
- CompactionMetrics.getInstance()
- .updateCompactionMemoryMetrics(taskType, estimatedMemoryCost);
- CompactionMetrics.getInstance()
- .updateCompactionTaskSelectedFileNum(taskType,
task.getAllSourceTsFiles().size());
+ estimatedMemoryCost = task.getEstimatedMemoryCost();
+ if (estimatedMemoryCost < 0) {
+ return false;
}
+ CompactionTaskType taskType = task.getCompactionTaskType();
+ memoryAcquired =
+ SystemInfo.getInstance().addCompactionMemoryCost(taskType,
estimatedMemoryCost, 60);
+ CompactionMetrics.getInstance().updateCompactionMemoryMetrics(taskType,
estimatedMemoryCost);
+ CompactionMetrics.getInstance()
+ .updateCompactionTaskSelectedFileNum(taskType,
task.getAllSourceTsFiles().size());
fileHandleAcquired =
SystemInfo.getInstance().addCompactionFileNum(task.getProcessedFileNum(), 60);
CompactionTaskSummary summary = task.getSummary();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
index 2c5a6acd2b5..2207fd7f51a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
@@ -33,9 +33,6 @@ public abstract class AbstractCrossSpaceEstimator extends
AbstractCompactionEsti
public long estimateCrossCompactionMemory(
List<TsFileResource> seqResources, List<TsFileResource> unseqResources)
throws IOException {
- if (!config.isEnableCompactionMemControl()) {
- return 0;
- }
List<TsFileResource> resources = new ArrayList<>(seqResources.size() +
unseqResources.size());
resources.addAll(seqResources);
resources.addAll(unseqResources);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
index ae2c6f2e016..50cf624a5c1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
@@ -31,14 +31,10 @@ import java.util.List;
public abstract class AbstractInnerSpaceEstimator extends
AbstractCompactionEstimator {
public long estimateInnerCompactionMemory(List<TsFileResource> resources)
throws IOException {
- if (!config.isEnableCompactionMemControl()) {
- return 0;
- }
-
if (!CompactionEstimateUtils.addReadLock(resources)) {
return -1L;
}
- long cost = 0;
+ long cost;
try {
if (!isAllSourceFileExist(resources)) {
return -1L;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 68ccba4bcec..ab5440d5358 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -60,7 +60,7 @@ public class MemTableFlushTask {
private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER =
FlushSubTaskPoolManager.getInstance();
private static final WritingMetrics WRITING_METRICS =
WritingMetrics.getInstance();
- private static IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
/* storage group name -> last time */
private static final Map<String, Long> flushPointsCache = new
ConcurrentHashMap<>();
private final Future<?> encodingTaskFuture;
@@ -69,7 +69,7 @@ public class MemTableFlushTask {
private final LinkedBlockingQueue<Object> encodingTaskQueue = new
LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Object> ioTaskQueue =
- (config.isEnableMemControl() &&
SystemInfo.getInstance().isEncodingFasterThanIo())
+ (SystemInfo.getInstance().isEncodingFasterThanIo())
? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
: new LinkedBlockingQueue<>();
@@ -118,7 +118,7 @@ public class MemTableFlushTask {
avgSeriesPointsNum);
long estimatedTemporaryMemSize = 0L;
- if (config.isEnableMemControl() &&
SystemInfo.getInstance().isEncodingFasterThanIo()) {
+ if (SystemInfo.getInstance().isEncodingFasterThanIo()) {
estimatedTemporaryMemSize =
memTable.getSeriesNumber() == 0
? 0
@@ -192,12 +192,10 @@ public class MemTableFlushTask {
throw new ExecutionException(e);
}
- if (config.isEnableMemControl()) {
- if (estimatedTemporaryMemSize != 0) {
-
SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
- }
- SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >=
memSerializeTime);
+ if (estimatedTemporaryMemSize != 0) {
+
SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
}
+ SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >=
memSerializeTime);
MetricService.getInstance()
.timer(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 0b2ff4a04cc..e54e7ef2c36 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -66,12 +66,6 @@ public abstract class AbstractMemTable implements IMemTable {
/** DeviceId -> chunkGroup(MeasurementId -> chunk). */
private final Map<IDeviceID, IWritableMemChunkGroup> memTableMap;
- /**
- * The initial value is true because we want to calculate the text data size
when recover
- * memTable.
- */
- protected boolean disableMemControl = true;
-
private boolean shouldFlush = false;
private volatile FlushStatus flushStatus = FlushStatus.WORKING;
private final int avgSeriesPointNumThreshold =
@@ -205,7 +199,7 @@ public abstract class AbstractMemTable implements IMemTable
{
dataTypes.add(schema.getType());
}
}
- memSize += MemUtils.getRecordsSize(dataTypes, values, disableMemControl);
+ memSize += MemUtils.getRowRecordSize(dataTypes, values);
write(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(),
values);
int pointsInserted =
@@ -252,7 +246,7 @@ public abstract class AbstractMemTable implements IMemTable
{
if (schemaList.isEmpty()) {
return;
}
- memSize += MemUtils.getAlignedRecordsSize(dataTypes, values,
disableMemControl);
+ memSize += MemUtils.getAlignedRowRecordSize(dataTypes, values);
writeAlignedRow(insertRowNode.getDeviceID(), schemaList,
insertRowNode.getTime(), values);
int pointsInserted =
insertRowNode.getMeasurements().length -
insertRowNode.getFailedMeasurementNumber();
@@ -276,7 +270,7 @@ public abstract class AbstractMemTable implements IMemTable
{
throws WriteProcessException {
try {
writeTabletNode(insertTabletNode, start, end);
- memSize += MemUtils.getTabletSize(insertTabletNode, start, end,
disableMemControl);
+ memSize += MemUtils.getTabletSize(insertTabletNode, start, end);
int pointsInserted =
(insertTabletNode.getDataTypes().length -
insertTabletNode.getFailedMeasurementNumber())
* (end - start);
@@ -302,7 +296,7 @@ public abstract class AbstractMemTable implements IMemTable
{
throws WriteProcessException {
try {
writeAlignedTablet(insertTabletNode, start, end);
- memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end,
disableMemControl);
+ memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end);
int pointsInserted =
(insertTabletNode.getDataTypes().length -
insertTabletNode.getFailedMeasurementNumber())
* (end - start);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index efacc8afc15..64a56ee3ac5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
@@ -60,8 +58,6 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
- private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
-
public AlignedWritableMemChunk(List<IMeasurementSchema> schemaList) {
this.measurementIndexMap = new LinkedHashMap<>();
List<TSDataType> dataTypeList = new ArrayList<>();
@@ -399,7 +395,7 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
tsDataType == TSDataType.TEXT
? list.getBinaryByValueIndex(sortedRowIndex,
columnIndex)
: null,
- CONFIG.isEnableMemControl());
+ true);
CompressionRatio.decreaseDuplicatedMemorySize(recordSize);
}
continue;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
index f8e625a3841..70b0f909381 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
@@ -32,11 +32,6 @@ public class PrimitiveMemTable extends AbstractMemTable {
super(database, dataRegionId);
}
- public PrimitiveMemTable(String database, String dataRegionId, boolean
enableMemControl) {
- super(database, dataRegionId);
- this.disableMemControl = !enableMemControl;
- }
-
public PrimitiveMemTable(
String database, String dataRegionId, Map<IDeviceID,
IWritableMemChunkGroup> memTableMap) {
super(database, dataRegionId, memTableMap);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 5bbf9ac7fe3..953606ed136 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -106,9 +106,6 @@ public class TsFileProcessor {
/** IoTDB config. */
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- /** whether it's enable mem control. */
- private final boolean enableMemControl = config.isEnableMemControl();
-
/** database info for mem control. */
private final DataRegionInfo dataRegionInfo;
/** tsfile processor info for mem control. */
@@ -251,22 +248,21 @@ public class TsFileProcessor {
}
long[] memIncrements = null;
- if (enableMemControl) {
- long startTime = System.nanoTime();
- if (insertRowNode.isAligned()) {
- memIncrements =
- checkAlignedMemCostAndAddToTspInfo(
- insertRowNode.getDevicePath().getFullPath(),
insertRowNode.getMeasurements(),
- insertRowNode.getDataTypes(), insertRowNode.getValues());
- } else {
- memIncrements =
- checkMemCostAndAddToTspInfo(
- insertRowNode.getDevicePath().getFullPath(),
insertRowNode.getMeasurements(),
- insertRowNode.getDataTypes(), insertRowNode.getValues());
- }
- // recordScheduleMemoryBlockCost
- costsForMetrics[1] += System.nanoTime() - startTime;
+
+ long memControlStartTime = System.nanoTime();
+ if (insertRowNode.isAligned()) {
+ memIncrements =
+ checkAlignedMemCostAndAddToTspInfo(
+ insertRowNode.getDevicePath().getFullPath(),
insertRowNode.getMeasurements(),
+ insertRowNode.getDataTypes(), insertRowNode.getValues());
+ } else {
+ memIncrements =
+ checkMemCostAndAddToTspInfo(
+ insertRowNode.getDevicePath().getFullPath(),
insertRowNode.getMeasurements(),
+ insertRowNode.getDataTypes(), insertRowNode.getValues());
}
+ // recordScheduleMemoryBlockCost
+ costsForMetrics[1] += System.nanoTime() - memControlStartTime;
long startTime = System.nanoTime();
WALFlushListener walFlushListener;
@@ -276,9 +272,7 @@ public class TsFileProcessor {
throw walFlushListener.getCause();
}
} catch (Exception e) {
- if (enableMemControl) {
- rollbackMemoryInfo(memIncrements);
- }
+ rollbackMemoryInfo(memIncrements);
throw new WriteProcessException(
String.format(
"%s: %s write WAL failed",
@@ -323,7 +317,7 @@ public class TsFileProcessor {
costsForMetrics[3] += System.nanoTime() - startTime;
}
- private void createNewWorkingMemTable() throws WriteProcessException {
+ private void createNewWorkingMemTable() {
workMemTable =
MemTableManager.getInstance()
.getAvailableMemTable(
@@ -356,29 +350,27 @@ public class TsFileProcessor {
long[] memIncrements = null;
try {
- if (enableMemControl) {
- long startTime = System.nanoTime();
- if (insertTabletNode.isAligned()) {
- memIncrements =
- checkAlignedMemCostAndAddToTsp(
- insertTabletNode.getDevicePath().getFullPath(),
- insertTabletNode.getMeasurements(),
- insertTabletNode.getDataTypes(),
- insertTabletNode.getColumns(),
- start,
- end);
- } else {
- memIncrements =
- checkMemCostAndAddToTspInfo(
- insertTabletNode.getDevicePath().getFullPath(),
- insertTabletNode.getMeasurements(),
- insertTabletNode.getDataTypes(),
- insertTabletNode.getColumns(),
- start,
- end);
- }
-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(System.nanoTime() -
startTime);
+ long startTime = System.nanoTime();
+ if (insertTabletNode.isAligned()) {
+ memIncrements =
+ checkAlignedMemCostAndAddToTsp(
+ insertTabletNode.getDevicePath().getFullPath(),
+ insertTabletNode.getMeasurements(),
+ insertTabletNode.getDataTypes(),
+ insertTabletNode.getColumns(),
+ start,
+ end);
+ } else {
+ memIncrements =
+ checkMemCostAndAddToTspInfo(
+ insertTabletNode.getDevicePath().getFullPath(),
+ insertTabletNode.getMeasurements(),
+ insertTabletNode.getDataTypes(),
+ insertTabletNode.getColumns(),
+ start,
+ end);
}
+
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(System.nanoTime() -
startTime);
} catch (WriteProcessException e) {
for (int i = start; i < end; i++) {
results[i] = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT,
e.getMessage());
@@ -397,9 +389,7 @@ public class TsFileProcessor {
for (int i = start; i < end; i++) {
results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
}
- if (enableMemControl) {
- rollbackMemoryInfo(memIncrements);
- }
+ rollbackMemoryInfo(memIncrements);
throw new WriteProcessException(e);
} finally {
PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(System.nanoTime() -
startTime);
@@ -800,13 +790,6 @@ public class TsFileProcessor {
if (workMemTable.shouldFlush()) {
return true;
}
- if (!enableMemControl && workMemTable.memSize() >=
getMemtableSizeThresholdBasedOnSeriesNum()) {
- logger.info(
- "The memtable size {} of tsfile {} reaches the threshold",
- workMemTable.memSize(),
- tsFileResource.getTsFile().getAbsolutePath());
- return true;
- }
if (workMemTable.reachTotalPointNumThreshold()) {
logger.info(
"The avg series points num {} of tsfile {} reaches the threshold",
@@ -819,10 +802,6 @@ public class TsFileProcessor {
return false;
}
- private long getMemtableSizeThresholdBasedOnSeriesNum() {
- return config.getMemtableSizeThreshold();
- }
-
public boolean shouldClose() {
long fileSize = tsFileResource.getTsFileSize();
long fileSizeThreshold = sequence ? config.getSeqTsFileSize() :
config.getUnSeqTsFileSize();
@@ -1047,9 +1026,7 @@ public class TsFileProcessor {
lastWorkMemtableFlushTime = System.currentTimeMillis();
updateLatestFlushTimeCallback.call(this, lastTimeForEachDevice,
lastWorkMemtableFlushTime);
- if (enableMemControl) {
-
SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost());
- }
+
SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost());
flushingMemTables.addLast(tobeFlushed);
if (logger.isDebugEnabled()) {
logger.debug(
@@ -1096,21 +1073,19 @@ public class TsFileProcessor {
}
memTable.release();
MemTableManager.getInstance().decreaseMemtableNumber();
- if (enableMemControl) {
- // reset the mem cost in StorageGroupProcessorInfo
-
dataRegionInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
- if (logger.isDebugEnabled()) {
- logger.debug(
- "[mem control] {}: {} flush finished, try to reset system
memcost, "
- + "flushing memtable list size: {}",
- storageGroupName,
- tsFileResource.getTsFile().getName(),
- flushingMemTables.size());
- }
- // report to System
- SystemInfo.getInstance().resetStorageGroupStatus(dataRegionInfo);
-
SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost());
+ // reset the mem cost in StorageGroupProcessorInfo
+ dataRegionInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "[mem control] {}: {} flush finished, try to reset system memcost,
"
+ + "flushing memtable list size: {}",
+ storageGroupName,
+ tsFileResource.getTsFile().getName(),
+ flushingMemTables.size());
}
+ // report to System
+ SystemInfo.getInstance().resetStorageGroupStatus(dataRegionInfo);
+
SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost());
if (logger.isDebugEnabled()) {
logger.debug(
"{}: {} flush finished, remove a memtable from flushing list, "
@@ -1381,10 +1356,8 @@ public class TsFileProcessor {
closeFileListener.onClosed(this);
}
- if (enableMemControl) {
- tsFileProcessorInfo.clear();
- dataRegionInfo.closeTsFileProcessorAndReportToSystem(this);
- }
+ tsFileProcessorInfo.clear();
+ dataRegionInfo.closeTsFileProcessorAndReportToSystem(this);
writer = null;
}
@@ -1400,10 +1373,8 @@ public class TsFileProcessor {
for (CloseFileListener closeFileListener : closeFileListeners) {
closeFileListener.onClosed(this);
}
- if (enableMemControl) {
- tsFileProcessorInfo.clear();
- dataRegionInfo.closeTsFileProcessorAndReportToSystem(this);
- }
+ tsFileProcessorInfo.clear();
+ dataRegionInfo.closeTsFileProcessorAndReportToSystem(this);
logger.info(
"Storage group {} close and remove empty file {}",
storageGroupName,
@@ -1425,16 +1396,6 @@ public class TsFileProcessor {
this.managedByFlushManager = managedByFlushManager;
}
- /** sync method */
- public boolean isMemtableNotNull() {
- flushQueryLock.writeLock().lock();
- try {
- return workMemTable != null;
- } finally {
- flushQueryLock.writeLock().unlock();
- }
- }
-
/** close this tsfile */
public void close() throws TsFileProcessorException {
try {
@@ -1559,10 +1520,6 @@ public class TsFileProcessor {
}
}
- public TsFileProcessorInfo getTsFileProcessorInfo() {
- return tsFileProcessorInfo;
- }
-
public void setTsFileProcessorInfo(TsFileProcessorInfo tsFileProcessorInfo) {
this.tsFileProcessorInfo = tsFileProcessorInfo;
}
@@ -1581,10 +1538,6 @@ public class TsFileProcessor {
return workMemTable != null ? workMemTable.getUpdateTime() :
Long.MAX_VALUE;
}
- public long getLastWorkMemtableFlushTime() {
- return lastWorkMemtableFlushTime;
- }
-
public boolean isSequence() {
return sequence;
}
@@ -1593,10 +1546,6 @@ public class TsFileProcessor {
workMemTable.setShouldFlush();
}
- public void addFlushListener(FlushListener listener) {
- flushListeners.add(listener);
- }
-
public void addCloseFileListener(CloseFileListener listener) {
closeFileListeners.add(listener);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 8d062396f49..c1a71d32a53 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -337,7 +337,7 @@ public class WritableMemChunk implements IWritableMemChunk {
MemUtils.getRecordSize(
tsDataType,
tsDataType == TSDataType.TEXT ? list.getBinary(sortedRowIndex)
: null,
- CONFIG.isEnableMemControl());
+ true);
CompressionRatio.decreaseDuplicatedMemorySize(recordSize);
continue;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/MemTableManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/MemTableManager.java
index 5f44e11099e..ef6f4edc3cf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/MemTableManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/MemTableManager.java
@@ -18,23 +18,10 @@
*/
package org.apache.iotdb.db.storageengine.rescon.memory;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class MemTableManager {
-
- private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
-
- private static final Logger logger =
LoggerFactory.getLogger(MemTableManager.class);
-
- private static final int WAIT_TIME = 100;
- public static final int MEMTABLE_NUM_FOR_EACH_PARTITION = 4;
private int currentMemtableNumber = 0;
private MemTableManager() {}
@@ -43,36 +30,9 @@ public class MemTableManager {
return InstanceHolder.INSTANCE;
}
- public synchronized IMemTable getAvailableMemTable(String storageGroup,
String dataRegionId)
- throws WriteProcessException {
- if (CONFIG.isEnableMemControl()) {
- currentMemtableNumber++;
- return new PrimitiveMemTable(storageGroup, dataRegionId,
CONFIG.isEnableMemControl());
- }
-
- if (!reachMaxMemtableNumber()) {
- currentMemtableNumber++;
- return new PrimitiveMemTable(storageGroup, dataRegionId);
- }
-
- // wait until the total number of memtable is less than the system capacity
- int waitCount = 1;
- while (true) {
- if (!reachMaxMemtableNumber()) {
- currentMemtableNumber++;
- return new PrimitiveMemTable(storageGroup, dataRegionId);
- }
- try {
- wait(WAIT_TIME);
- } catch (InterruptedException e) {
- logger.error("{} fails to wait for memtables {}, continue to wait",
storageGroup, e);
- Thread.currentThread().interrupt();
- throw new WriteProcessException(e);
- }
- if (waitCount++ % 10 == 0) {
- logger.info("{} has waited for a memtable for {}ms", storageGroup,
waitCount * WAIT_TIME);
- }
- }
+ public synchronized IMemTable getAvailableMemTable(String storageGroup,
String dataRegionId) {
+ currentMemtableNumber++;
+ return new PrimitiveMemTable(storageGroup, dataRegionId);
}
public int getCurrentMemtableNumber() {
@@ -84,20 +44,6 @@ public class MemTableManager {
notifyAll();
}
- /** Called when memory control is disabled */
- private boolean reachMaxMemtableNumber() {
- return currentMemtableNumber >= CONFIG.getMaxMemtableNumber();
- }
-
- /** Called when memory control is disabled */
- public synchronized void addOrDeleteStorageGroup(int diff) {
- int maxMemTableNum = CONFIG.getMaxMemtableNumber();
- maxMemTableNum +=
- MEMTABLE_NUM_FOR_EACH_PARTITION *
CONFIG.getConcurrentWritingTimePartition() * diff;
- CONFIG.setMaxMemtableNumber(maxMemTableNum);
- notifyAll();
- }
-
public synchronized void close() {
currentMemtableNumber = 0;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
index 010bd9eb6ad..f17dba51030 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
@@ -30,7 +30,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException;
-import org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import org.slf4j.Logger;
@@ -65,7 +64,7 @@ public class SystemInfo {
private int totalFileLimitForCrossTask =
config.getTotalFileLimitForCrossTask();
- private ExecutorService flushTaskSubmitThreadPool =
+ private final ExecutorService flushTaskSubmitThreadPool =
IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.FLUSH_TASK_SUBMIT.getName());
private double FLUSH_THRESHOLD = memorySizeForMemtable *
config.getFlushProportion();
private double REJECT_THRESHOLD = memorySizeForMemtable *
config.getRejectProportion();
@@ -100,8 +99,7 @@ public class SystemInfo {
dataRegionInfo.setLastReportedSize(currentDataRegionMemCost);
if (totalStorageGroupMemCost < FLUSH_THRESHOLD) {
return true;
- } else if (totalStorageGroupMemCost >= FLUSH_THRESHOLD
- && totalStorageGroupMemCost < REJECT_THRESHOLD) {
+ } else if (totalStorageGroupMemCost < REJECT_THRESHOLD) {
logger.debug(
"The total database mem costs are too large, call for flushing. "
+ "Current sg cost is {}",
@@ -260,9 +258,6 @@ public class SystemInfo {
public synchronized void resetCompactionMemoryCost(
CompactionTaskType taskType, long compactionMemoryCost) {
- if (!config.isEnableCompactionMemControl()) {
- return;
- }
this.compactionMemoryCost.addAndGet(-compactionMemoryCost);
switch (taskType) {
case INNER_SEQ:
@@ -284,11 +279,7 @@ public class SystemInfo {
}
public long getMemorySizeForCompaction() {
- if (config.isEnableMemControl()) {
- return memorySizeForCompaction;
- } else {
- return Long.MAX_VALUE;
- }
+ return memorySizeForCompaction;
}
public void allocateWriteMemory() {
@@ -350,7 +341,7 @@ public class SystemInfo {
*/
private boolean chooseMemTablesToMarkFlush(TsFileProcessor
currentTsFileProcessor) {
// If invoke flush by replaying logs, do not flush now!
- if (reportedStorageGroupMemCostMap.size() == 0) {
+ if (reportedStorageGroupMemCostMap.isEmpty()) {
return false;
}
PriorityQueue<TsFileProcessor> allTsFileProcessors =
@@ -370,10 +361,7 @@ public class SystemInfo {
TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek();
memCost += selectedTsFileProcessor.getWorkMemTableRamCost();
selectedTsFileProcessor.setWorkMemTableShouldFlush();
- flushTaskSubmitThreadPool.submit(
- () -> {
- selectedTsFileProcessor.submitAFlushTask();
- });
+
flushTaskSubmitThreadPool.submit(selectedTsFileProcessor::submitAFlushTask);
if (selectedTsFileProcessor == currentTsFileProcessor) {
isCurrentTsFileProcessorSelected = true;
}
@@ -408,7 +396,7 @@ public class SystemInfo {
private InstanceHolder() {}
- private static SystemInfo instance = new SystemInfo();
+ private static final SystemInfo instance = new SystemInfo();
}
public synchronized void applyTemporaryMemoryForFlushing(long
estimatedTemporaryMemSize) {
@@ -438,8 +426,4 @@ public class SystemInfo {
public double getRejectThershold() {
return REJECT_THRESHOLD;
}
-
- public int flushingMemTableNum() {
- return FlushManager.getInstance().getNumberOfWorkingTasks();
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index d82451eaa0e..a033539cdcb 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -46,8 +46,9 @@ public class MemUtils {
private MemUtils() {}
/**
- * function for getting the value size. If mem control enabled, do not add
text data size here,
- * the size will be added to memtable before inserting.
+ * Function for obtaining the value size. For text values, there are two
conditions: 1. During
+ * insertion, their size has already been added to memory. 2. During
flushing, their size needs to
+ * be calculated.
*/
public static long getRecordSize(TSDataType dataType, Object value, boolean
addingTextDataSize) {
if (dataType == TSDataType.TEXT) {
@@ -57,11 +58,10 @@ public class MemUtils {
}
/**
- * function for getting the value size. If mem control enabled, do not add
text data size here,
- * the size will be added to memtable before inserting.
+ * Function for obtaining the value size. For text values, their size has
already been added to
+ * memory before insertion
*/
- public static long getRecordsSize(
- List<TSDataType> dataTypes, Object[] value, boolean addingTextDataSize) {
+ public static long getRowRecordSize(List<TSDataType> dataTypes, Object[]
value) {
int emptyRecordCount = 0;
long memSize = 0L;
for (int i = 0; i < value.length; i++) {
@@ -69,28 +69,23 @@ public class MemUtils {
emptyRecordCount++;
continue;
}
- memSize += getRecordSize(dataTypes.get(i - emptyRecordCount), value[i],
addingTextDataSize);
+ memSize += getRecordSize(dataTypes.get(i - emptyRecordCount), value[i],
false);
}
return memSize;
}
/**
- * function for getting the vector value size. If mem control enabled, do
not add text data size
- * here, the size will be added to memtable before inserting.
+ * Function for obtaining the value size. For text values, their size has
already been added to
+ * memory before insertion
*/
- public static long getAlignedRecordsSize(
- List<TSDataType> dataTypes, Object[] value, boolean addingTextDataSize) {
+ public static long getAlignedRowRecordSize(List<TSDataType> dataTypes,
Object[] value) {
// time and index size
long memSize = 8L + 4L;
for (int i = 0; i < dataTypes.size(); i++) {
- if (value[i] == null) {
+ if (value[i] == null || dataTypes.get(i) == TSDataType.TEXT) {
continue;
}
- if (dataTypes.get(i) == TSDataType.TEXT) {
- memSize += (addingTextDataSize ? getBinarySize((Binary) value[i]) : 0);
- } else {
- memSize += dataTypes.get(i).getDataTypeSize();
- }
+ memSize += dataTypes.get(i).getDataTypeSize();
}
return memSize;
}
@@ -101,7 +96,7 @@ public class MemUtils {
public static long getBinaryColumnSize(Binary[] column, int start, int end) {
long memSize = 0;
- memSize += (end - start) * RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
+ memSize += (long) (end - start) *
RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
for (int i = start; i < end; i++) {
memSize += RamUsageEstimator.sizeOf(column[i].getValues());
}
@@ -109,11 +104,10 @@ public class MemUtils {
}
/**
- * If mem control enabled, do not add text data size here, the size will be
added to memtable
- * before inserting.
+ * Function for obtaining the value size. For text values, their size has
already been added to
+ * memory before insertion
*/
- public static long getTabletSize(
- InsertTabletNode insertTabletNode, int start, int end, boolean
addingTextDataSize) {
+ public static long getTabletSize(InsertTabletNode insertTabletNode, int
start, int end) {
if (start >= end) {
return 0L;
}
@@ -124,19 +118,12 @@ public class MemUtils {
}
// time column memSize
memSize += (end - start) * 8L;
- if (insertTabletNode.getDataTypes()[i] == TSDataType.TEXT &&
addingTextDataSize) {
- for (int j = start; j < end; j++) {
- memSize += getBinarySize(((Binary[])
insertTabletNode.getColumns()[i])[j]);
- }
- } else {
- memSize += (end - start) *
insertTabletNode.getDataTypes()[i].getDataTypeSize();
- }
+ memSize += (long) (end - start) *
insertTabletNode.getDataTypes()[i].getDataTypeSize();
}
return memSize;
}
- public static long getAlignedTabletSize(
- InsertTabletNode insertTabletNode, int start, int end, boolean
addingTextDataSize) {
+ public static long getAlignedTabletSize(InsertTabletNode insertTabletNode,
int start, int end) {
if (start >= end) {
return 0L;
}
@@ -145,16 +132,7 @@ public class MemUtils {
if (insertTabletNode.getMeasurements()[i] == null) {
continue;
}
- TSDataType valueType;
- // value columns memSize
- valueType = insertTabletNode.getDataTypes()[i];
- if (valueType == TSDataType.TEXT && addingTextDataSize) {
- for (int j = start; j < end; j++) {
- memSize += getBinarySize(((Binary[])
insertTabletNode.getColumns()[i])[j]);
- }
- } else {
- memSize += (long) (end - start) * valueType.getDataTypeSize();
- }
+ memSize += (long) (end - start) *
insertTabletNode.getDataTypes()[i].getDataTypeSize();
}
// time and index column memSize for vector
memSize += (end - start) * (8L + 4L);
@@ -162,11 +140,11 @@ public class MemUtils {
}
/** Calculate how much memory will be used if the given record is written to
sequence file. */
- public static long getTsRecordMem(TSRecord record) {
+ public static long getTsRecordMem(TSRecord tsRecord) {
long memUsed = 8; // time
memUsed += 8; // deviceId reference
- memUsed += getStringMem(record.deviceId);
- for (DataPoint dataPoint : record.dataPointList) {
+ memUsed += getStringMem(tsRecord.deviceId);
+ for (DataPoint dataPoint : tsRecord.dataPointList) {
memUsed += 8; // dataPoint reference
memUsed += getDataPointMem(dataPoint);
}
@@ -176,7 +154,7 @@ public class MemUtils {
/** function for getting the memory size of the given string. */
public static long getStringMem(String str) {
// wide char (2 bytes each) and 64B String overhead
- return str.length() * 2 + 64L;
+ return str.length() * 2L + 64L;
}
/** function for getting the memory size of the given data point. */
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 1a1e3b33624..4092db7d442 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -189,7 +189,7 @@ public abstract class AlignedTVList extends TVList {
columnValue != null
? getBinarySize((Binary) columnValue)
: getBinarySize(Binary.EMPTY_VALUE);
- if (memoryBinaryChunkSize[i] >= targetChunkSize) {
+ if (memoryBinaryChunkSize[i] >= TARGET_CHUNK_SIZE) {
reachMaxChunkSizeFlag = true;
}
break;
@@ -767,7 +767,7 @@ public abstract class AlignedTVList extends TVList {
memoryBinaryChunkSize[i] +=
arrayT[elementIndex + i1] != null ?
getBinarySize(arrayT[elementIndex + i1]) : 0;
}
- if (memoryBinaryChunkSize[i] > targetChunkSize) {
+ if (memoryBinaryChunkSize[i] > TARGET_CHUNK_SIZE) {
reachMaxChunkSizeFlag = true;
}
break;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index d3d21470eb5..4c5d4061530 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -99,7 +99,7 @@ public abstract class BinaryTVList extends TVList {
@Override
public boolean reachMaxChunkSizeThreshold() {
- return memoryBinaryChunkSize >= targetChunkSize;
+ return memoryBinaryChunkSize >= TARGET_CHUNK_SIZE;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 246102bb7eb..f2b26158a5f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -46,10 +46,8 @@ import static
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HE
import static
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
public abstract class TVList implements WALEntryValue {
-
- protected static final int SMALL_ARRAY_LENGTH = 32;
protected static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not
consistent";
- protected static final long targetChunkSize =
+ protected static final long TARGET_CHUNK_SIZE =
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
// list of timestamp array, add 1 when expanded -> data point timestamp array
// index relation: arrayIndex -> elementIndex
@@ -93,9 +91,9 @@ public abstract class TVList implements WALEntryValue {
public static long tvListArrayMemCost(TSDataType type) {
long size = 0;
// time array mem size
- size += (long) PrimitiveArrayManager.ARRAY_SIZE * 8L;
+ size += PrimitiveArrayManager.ARRAY_SIZE * 8L;
// value array mem size
- size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long)
type.getDataTypeSize();
+ size += PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize();
// two array headers mem size
size += NUM_BYTES_ARRAY_HEADER * 2L;
// Object references size in ArrayList
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
index 38c6ed49580..79974cc7f7c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
@@ -106,7 +106,6 @@ public class CompactionSchedulerTest {
.getConfig()
.setInnerUnseqCompactionPerformer(InnerUnseqCompactionPerformer.READ_POINT);
IoTDBDescriptor.getInstance().getConfig().setMinCrossCompactionUnseqFileLevel(0);
-
IoTDBDescriptor.getInstance().getConfig().setEnableCompactionMemControl(false);
CompactionTaskManager.getInstance().start();
while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
try {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionConfigRestorer.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionConfigRestorer.java
index a280ae220f8..a7858b9dc6a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionConfigRestorer.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionConfigRestorer.java
@@ -58,9 +58,6 @@ public class CompactionConfigRestorer {
private int oldMinCrossCompactionUnseqLevel =
IoTDBDescriptor.getInstance().getConfig().getMinCrossCompactionUnseqFileLevel();
- private boolean oldEnableCompactionMemControl =
- IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMemControl();
-
public CompactionConfigRestorer() {}
public void restoreCompactionConfig() {
@@ -86,6 +83,5 @@ public class CompactionConfigRestorer {
config.setInnerSeqCompactionPerformer(oldInnerSeqPerformer);
config.setInnerUnseqCompactionPerformer(oldInnerUnseqPerformer);
config.setMinCrossCompactionUnseqFileLevel(oldMinCrossCompactionUnseqLevel);
- config.setEnableCompactionMemControl(oldEnableCompactionMemControl);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 9766269e420..e63162ed263 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -87,9 +87,6 @@ public class EnvironmentUtils {
private static final long oldSeqTsFileSize = config.getSeqTsFileSize();
private static final long oldUnSeqTsFileSize = config.getUnSeqTsFileSize();
-
- private static final long oldGroupSizeInByte =
config.getMemtableSizeThreshold();
-
private static TConfiguration tConfiguration =
TConfigurationConst.defaultTConfiguration;
public static boolean examinePorts =
@@ -170,7 +167,6 @@ public class EnvironmentUtils {
cleanAllDir();
config.setSeqTsFileSize(oldSeqTsFileSize);
config.setUnSeqTsFileSize(oldUnSeqTsFileSize);
- config.setMemtableSizeThreshold(oldGroupSizeInByte);
}
private static boolean examinePorts() {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
index 7294a1a70fa..fee451265c5 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
@@ -38,6 +38,9 @@ import
org.apache.iotdb.tsfile.write.record.datapoint.StringDataPoint;
import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+
public class MemUtilsTest {
@Test
@@ -50,7 +53,44 @@ public class MemUtilsTest {
}
@Test
- public void getRecordSizeWithInsertNodeTest() throws IllegalPathException {
+ public void getRecordSizeWithInsertRowNodeTest() {
+ Object[] row = {1, 2L, 3.0f, 4.0d, new Binary("5",
TSFileConfig.STRING_CHARSET)};
+ List<TSDataType> dataTypes = new ArrayList<>();
+ int sizeSum = 0;
+ dataTypes.add(TSDataType.INT32);
+ sizeSum += 8 + TSDataType.INT32.getDataTypeSize();
+ dataTypes.add(TSDataType.INT64);
+ sizeSum += 8 + TSDataType.INT64.getDataTypeSize();
+ dataTypes.add(TSDataType.FLOAT);
+ sizeSum += 8 + TSDataType.FLOAT.getDataTypeSize();
+ dataTypes.add(TSDataType.DOUBLE);
+ sizeSum += 8 + TSDataType.DOUBLE.getDataTypeSize();
+ dataTypes.add(TSDataType.TEXT);
+ sizeSum += 8;
+ Assert.assertEquals(sizeSum, MemUtils.getRowRecordSize(dataTypes, row));
+ }
+
+ @Test
+ public void getRecordSizeWithInsertAlignedRowNodeTest() {
+ Object[] row = {1, 2L, 3.0f, 4.0d, new Binary("5",
TSFileConfig.STRING_CHARSET)};
+ List<TSDataType> dataTypes = new ArrayList<>();
+ int sizeSum = 0;
+ dataTypes.add(TSDataType.INT32);
+ sizeSum += TSDataType.INT32.getDataTypeSize();
+ dataTypes.add(TSDataType.INT64);
+ sizeSum += TSDataType.INT64.getDataTypeSize();
+ dataTypes.add(TSDataType.FLOAT);
+ sizeSum += TSDataType.FLOAT.getDataTypeSize();
+ dataTypes.add(TSDataType.DOUBLE);
+ sizeSum += TSDataType.DOUBLE.getDataTypeSize();
+ dataTypes.add(TSDataType.TEXT);
+ // time and index size
+ sizeSum += 8 + 4;
+ Assert.assertEquals(sizeSum, MemUtils.getAlignedRowRecordSize(dataTypes,
row));
+ }
+
+ @Test
+ public void getRecordSizeWithInsertTableNodeTest() throws
IllegalPathException {
PartialPath device = new PartialPath("root.sg.d1");
String[] measurements = {"s1", "s2", "s3", "s4", "s5"};
Object[] columns = {
@@ -83,13 +123,52 @@ public class MemUtilsTest {
null,
columns,
1);
- Assert.assertEquals(sizeSum, MemUtils.getTabletSize(insertNode, 0, 1,
false));
+ Assert.assertEquals(sizeSum, MemUtils.getTabletSize(insertNode, 0, 1));
+ }
+
+ @Test
+ public void getRecordSizeWithInsertAlignedTableNodeTest() throws
IllegalPathException {
+ PartialPath device = new PartialPath("root.sg.d1");
+ String[] measurements = {"s1", "s2", "s3", "s4", "s5"};
+ Object[] columns = {
+ new int[] {1},
+ new long[] {2},
+ new float[] {3},
+ new double[] {4},
+ new Binary[] {new Binary("5", TSFileConfig.STRING_CHARSET)}
+ };
+ TSDataType[] dataTypes = new TSDataType[6];
+ int sizeSum = 0;
+ dataTypes[0] = TSDataType.INT32;
+ sizeSum += TSDataType.INT32.getDataTypeSize();
+ dataTypes[1] = TSDataType.INT64;
+ sizeSum += TSDataType.INT64.getDataTypeSize();
+ dataTypes[2] = TSDataType.FLOAT;
+ sizeSum += TSDataType.FLOAT.getDataTypeSize();
+ dataTypes[3] = TSDataType.DOUBLE;
+ sizeSum += TSDataType.DOUBLE.getDataTypeSize();
+ dataTypes[4] = TSDataType.TEXT;
+ sizeSum += TSDataType.TEXT.getDataTypeSize();
+ // time and index size
+ sizeSum += 8 + 4;
+ InsertTabletNode insertNode =
+ new InsertTabletNode(
+ new PlanNodeId(""),
+ device,
+ true,
+ measurements,
+ dataTypes,
+ new long[1],
+ null,
+ columns,
+ 1);
+ Assert.assertEquals(sizeSum, MemUtils.getAlignedTabletSize(insertNode, 0,
1));
}
/** This method tests MemUtils.getStringMem() and MemUtils.getDataPointMem()
*/
@Test
public void getMemSizeTest() {
- int totalSize = 0;
+ long totalSize = 0;
String device = "root.sg.d1";
TSRecord record = new TSRecord(0, device);
@@ -123,7 +202,7 @@ public class MemUtilsTest {
totalSize += MemUtils.getDataPointMem(point6);
record.addTuple(point6);
- totalSize += 8 * record.dataPointList.size() +
MemUtils.getStringMem(device) + 16;
+ totalSize += 8L * record.dataPointList.size() +
MemUtils.getStringMem(device) + 16;
Assert.assertEquals(totalSize, MemUtils.getTsRecordMem(record));
}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 43d431a7a2d..6bfdac3b3d0 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -168,10 +168,6 @@ data_replication_factor=1
### Memory Control Configuration
####################
-# Whether to enable memory control
-# Datatype: boolean
-# enable_mem_control=true
-
# Memory Allocation Ratio: StorageEngine, QueryEngine, SchemaEngine,
Consensus, StreamingEngine and Free Memory.
# The parameter form is a:b:c:d:e:f, where a, b, c, d, e and f are integers.
for example: 1:1:1:1:1:1 , 6:2:1:1:1:1
# If you have high level of writing pressure and low level of reading
pressure, please adjust it to for example 6:1:1:1:1:1
@@ -190,12 +186,6 @@ data_replication_factor=1
# TimePartitionInfo is the total memory size of last flush time of all data
regions
# write_memory_proportion=19:1
-# Max number of concurrent writing time partitions in one database
-# This parameter is used to control total memTable number when memory control
is disabled
-# The max number of memTable is 4 * concurrent_writing_time_partition *
database number
-# Datatype: long
-# concurrent_writing_time_partition=1
-
# primitive array size (length of each array) in array pool
# Datatype: int
# primitive_array_size=64
@@ -479,11 +469,6 @@ data_replication_factor=1
# 2. SHUTDOWN: the system will be shutdown.
# handle_system_error=CHANGE_TO_READ_ONLY
-# Only take effects when enable_mem_control is false.
-# When a memTable's size (in byte) exceeds this, the memtable is flushed to
disk. The default threshold is 1 GB.
-# Datatype: long
-# memtable_size_threshold=1073741824
-
# Whether to timed flush sequence tsfiles' memtables.
# Datatype: boolean
# enable_timed_flush_seq_memtable=true
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 391426cc34f..0505205bcb2 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -85,7 +85,6 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
public RestorableTsFileIOWriter(File file, long maxMetadataSize) throws
IOException {
this(file, true);
this.maxMetadataSize = maxMetadataSize;
- this.enableMemoryControl = true;
this.chunkMetadataTempFile = new File(file.getAbsolutePath() +
CHUNK_METADATA_TEMP_FILE_SUFFIX);
this.checkMetadataSizeAndMayFlush();
}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 658a16deda6..9295077bf58 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -57,7 +57,6 @@ import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -111,7 +110,6 @@ public class TsFileIOWriter implements AutoCloseable {
protected volatile boolean hasChunkMetadataInDisk = false;
// record the total num of path in order to make bloom filter
protected int pathCount = 0;
- protected boolean enableMemoryControl = false;
private Path lastSerializePath = null;
protected LinkedList<Long> endPosInCMTForDevice = new LinkedList<>();
private volatile int chunkMetadataCount = 0;
@@ -151,10 +149,8 @@ public class TsFileIOWriter implements AutoCloseable {
}
/** for write with memory control */
- public TsFileIOWriter(File file, boolean enableMemoryControl, long
maxMetadataSize)
- throws IOException {
+ public TsFileIOWriter(File file, long maxMetadataSize) throws IOException {
this(file);
- this.enableMemoryControl = enableMemoryControl;
this.maxMetadataSize = maxMetadataSize;
chunkMetadataTempFile = new File(file.getAbsolutePath() +
CHUNK_METADATA_TEMP_FILE_SUFFIX);
}
@@ -306,9 +302,7 @@ public class TsFileIOWriter implements AutoCloseable {
/** end chunk and write some log. */
public void endCurrentChunk() {
- if (enableMemoryControl) {
- this.currentChunkMetadataSize +=
currentChunkMetadata.getRetainedSizeInBytes();
- }
+ this.currentChunkMetadataSize +=
currentChunkMetadata.getRetainedSizeInBytes();
chunkMetadataCount++;
chunkMetadataList.add(currentChunkMetadata);
currentChunkMetadata = null;
@@ -508,40 +502,6 @@ public class TsFileIOWriter implements AutoCloseable {
this.file = file;
}
- /** Remove such ChunkMetadata that its startTime is not in chunkStartTimes */
- public void filterChunks(Map<Path, List<Long>> chunkStartTimes) {
- Map<Path, Integer> startTimeIdxes = new HashMap<>();
- chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0));
-
- Iterator<ChunkGroupMetadata> chunkGroupMetaDataIterator =
chunkGroupMetadataList.iterator();
- while (chunkGroupMetaDataIterator.hasNext()) {
- ChunkGroupMetadata chunkGroupMetaData =
chunkGroupMetaDataIterator.next();
- String deviceId = chunkGroupMetaData.getDevice();
- int chunkNum = chunkGroupMetaData.getChunkMetadataList().size();
- Iterator<ChunkMetadata> chunkMetaDataIterator =
- chunkGroupMetaData.getChunkMetadataList().iterator();
- while (chunkMetaDataIterator.hasNext()) {
- IChunkMetadata chunkMetaData = chunkMetaDataIterator.next();
- Path path = new Path(deviceId, chunkMetaData.getMeasurementUid(),
true);
- int startTimeIdx = startTimeIdxes.get(path);
-
- List<Long> pathChunkStartTimes = chunkStartTimes.get(path);
- boolean chunkValid =
- startTimeIdx < pathChunkStartTimes.size()
- && pathChunkStartTimes.get(startTimeIdx) ==
chunkMetaData.getStartTime();
- if (!chunkValid) {
- chunkMetaDataIterator.remove();
- chunkNum--;
- } else {
- startTimeIdxes.put(path, startTimeIdx + 1);
- }
- }
- if (chunkNum == 0) {
- chunkGroupMetaDataIterator.remove();
- }
- }
- }
-
public void writePlanIndices() throws IOException {
ReadWriteIOUtils.write(MetaMarker.OPERATION_INDEX_RANGE,
out.wrapAsStream());
ReadWriteIOUtils.write(minPlanIndex, out.wrapAsStream());
@@ -630,7 +590,7 @@ public class TsFileIOWriter implements AutoCloseable {
*/
public int checkMetadataSizeAndMayFlush() throws IOException {
// This function should be called after all data of an aligned device has
been written
- if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) {
+ if (currentChunkMetadataSize > maxMetadataSize) {
try {
if (logger.isDebugEnabled()) {
logger.debug(
@@ -700,7 +660,7 @@ public class TsFileIOWriter implements AutoCloseable {
// for each device, we only serialize it once, in order to save io
writtenSize += ReadWriteIOUtils.write(seriesPath.getDevice(),
tempOutput.wrapAsStream());
}
- if (isNewPath && iChunkMetadataList.size() > 0) {
+ if (isNewPath && !iChunkMetadataList.isEmpty()) {
// serialize the public info of this measurement
writtenSize +=
ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(),
tempOutput.wrapAsStream());
@@ -719,10 +679,6 @@ public class TsFileIOWriter implements AutoCloseable {
return writtenSize;
}
- public String getCurrentChunkGroupDeviceId() {
- return currentChunkGroupDeviceId;
- }
-
public List<ChunkGroupMetadata> getChunkGroupMetadataList() {
return chunkGroupMetadataList;
}
diff --git
a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
index b130c7a0c58..949a9c1583e 100644
---
a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
+++
b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
@@ -95,7 +95,7 @@ public class TsFileIOWriterMemoryControlTest {
/** The following tests is for ChunkMetadata serialization and
deserialization. */
@Test
public void testSerializeAndDeserializeChunkMetadata() throws IOException {
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 *
1024 * 10)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 *
10)) {
List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
@@ -147,7 +147,7 @@ public class TsFileIOWriterMemoryControlTest {
@Test
public void testSerializeAndDeserializeAlignedChunkMetadata() throws
IOException {
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 *
1024 * 10)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 *
10)) {
List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
@@ -185,7 +185,7 @@ public class TsFileIOWriterMemoryControlTest {
@Test
public void testSerializeAndDeserializeMixedChunkMetadata() throws
IOException {
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 *
1024 * 10)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 *
10)) {
List<IChunkMetadata> originChunkMetadataList = new ArrayList<>();
List<String> seriesIds = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
@@ -258,7 +258,7 @@ public class TsFileIOWriterMemoryControlTest {
@Test
public void testWriteCompleteFileWithNormalChunk() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>>
originData = new HashMap<>();
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
@@ -311,7 +311,7 @@ public class TsFileIOWriterMemoryControlTest {
@Test
public void testWriteCompleteFileWithMultipleNormalChunk() throws
IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>>
originData = new HashMap<>();
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
@@ -398,7 +398,7 @@ public class TsFileIOWriterMemoryControlTest {
@Test
public void testWriteCompleteFileWithMetadataRemainsInMemoryWhenEndFile()
throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>>
originData = new HashMap<>();
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
@@ -490,7 +490,7 @@ public class TsFileIOWriterMemoryControlTest {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>>
originData = new HashMap<>();
long originTestChunkSize = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 10;
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 2; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
@@ -581,7 +581,7 @@ public class TsFileIOWriterMemoryControlTest {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>>
originTimes = new HashMap<>();
long originTestChunkSize = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 1;
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 2; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
@@ -672,7 +672,7 @@ public class TsFileIOWriterMemoryControlTest {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>>
originTimes = new HashMap<>();
long originTestChunkSize = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 10;
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 1024; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
@@ -762,7 +762,7 @@ public class TsFileIOWriterMemoryControlTest {
@Test
public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws
IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>>
originData = new HashMap<>();
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
@@ -798,7 +798,7 @@ public class TsFileIOWriterMemoryControlTest {
public void testWriteCompleteFileWithAlignedSeriesWithMultiChunks() throws
IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>>
originData = new HashMap<>();
int chunkNum = 512, seriesNum = 6;
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 1; ++i) {
String deviceId = sortedDeviceId.get(i);
for (int k = 0; k < chunkNum; ++k) {
@@ -840,7 +840,7 @@ public class TsFileIOWriterMemoryControlTest {
long originTestPointNum = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 10;
try {
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
for (int k = 0; k < chunkNum; ++k) {
@@ -881,7 +881,7 @@ public class TsFileIOWriterMemoryControlTest {
TEST_CHUNK_SIZE = 10;
int deviceNum = 1024;
try {
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < deviceNum; ++i) {
String deviceId = sortedDeviceId.get(i);
for (int k = 0; k < chunkNum; ++k) {
@@ -918,7 +918,7 @@ public class TsFileIOWriterMemoryControlTest {
public void testWritingAlignedSeriesByColumnWithMultiComponents() throws
IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>>
originValue = new HashMap<>();
TEST_CHUNK_SIZE = 10;
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; i++) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
@@ -976,7 +976,7 @@ public class TsFileIOWriterMemoryControlTest {
@Test
public void testWritingCompleteMixedFiles() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>>
originData = new HashMap<>();
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; ++i) {
String deviceId = sortedDeviceId.get(i);
for (int k = 0; k < 10; ++k) {
@@ -1075,7 +1075,7 @@ public class TsFileIOWriterMemoryControlTest {
@Test
public void testWritingAlignedSeriesByColumn() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>>
originValue = new HashMap<>();
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; i++) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
@@ -1129,7 +1129,7 @@ public class TsFileIOWriterMemoryControlTest {
@Test
public void testWritingAlignedSeriesByColumnWithMultiChunks() throws
IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>>
originValue = new HashMap<>();
- try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; i++) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);