This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch proceeding_vldb
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/proceeding_vldb by this push:
new 5bcdbb2 finish sliding windows
5bcdbb2 is described below
commit 5bcdbb2883c543cfb89420d2686b3711c705dbca
Author: EJTTianyu <[email protected]>
AuthorDate: Mon Feb 22 10:52:05 2021 +0800
finish sliding windows
---
.../resources/conf/iotdb-engine.properties | 10 +++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 24 ++++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 8 +++
.../engine/storagegroup/StorageGroupProcessor.java | 72 ++++++++++++++++++++--
.../db/engine/storagegroup/TsFileProcessor.java | 19 +++++-
5 files changed, 126 insertions(+), 7 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 1b9f9de..931d7d5 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -283,6 +283,16 @@ encoding_task_queue_size_for_flushing=2147483647
io_task_queue_size_for_flushing=2147483647
####################
+### Sliding Memory Table Configurations
+####################
+
+# Whether to enable sliding memory table
+enable_sliding_mem_table=true
+
+# Sliding window threshold, default, 16 MB
+sliding_window_threshold=16777216
+
+####################
### Upgrade Configurations
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c4be6f7..232a50b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -70,10 +70,34 @@ public class IoTDBConfig {
this.overlapSplit = overlapSplit;
}
+ public boolean isEnableSlidingMemTable() {
+ return enableSlidingMemTable;
+ }
+
+ public void setEnableSlidingMemTable(boolean enableSlidingMemTable) {
+ this.enableSlidingMemTable = enableSlidingMemTable;
+ }
+
+ public int getSlidingWindowThreshold() {
+ return slidingWindowThreshold;
+ }
+
+ public void setSlidingWindowThreshold(int slidingWindowThreshold) {
+ this.slidingWindowThreshold = slidingWindowThreshold;
+ }
+
/**
* storage engine configurations
*/
private boolean overlapSplit = true;
+ /**
+ * Whether to enable sliding memory table
+ */
+ private boolean enableSlidingMemTable = true;
+ /**
+ * Sliding window threshold, default, 16 MB
+ */
+ private int slidingWindowThreshold = 16 * 1024 * 1024;
/**
* Port which the metrics service listens to.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 3c6e703..36457f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -322,6 +322,14 @@ public class IoTDBDescriptor {
properties.getProperty("enable_unseq_compaction",
Boolean.toString(conf.isEnableUnseqCompaction()))));
+ conf.setEnableSlidingMemTable(Boolean.parseBoolean(properties
+ .getProperty("enable_sliding_mem_table",
+ Boolean.toString(conf.isEnableSlidingMemTable()))));
+
+ conf.setSlidingWindowThreshold(Integer.parseInt(properties
+ .getProperty("sliding_window_threshold",
+ Integer.toString(conf.getSlidingWindowThreshold()))));
+
conf.setFirstLevelFileNum(Integer.parseInt(properties.getProperty("first_level_file_num",
Integer.toString(conf.getFirstLevelFileNum()))));
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 7d3519e..5dceb6a 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -164,6 +164,10 @@ public class StorageGroupProcessor {
*/
protected final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors
= new TreeMap<>();
/**
+ * time partition id in the storage group -> flushingTsFileProcessor for
this time partition
+ */
+ protected final TreeMap<Long, TsFileProcessor>
flushingSequenceTsFileProcessors = new TreeMap<>();
+ /**
* time partition id in the storage group -> tsFileProcessor for this time
partition
*/
protected final TreeMap<Long, TsFileProcessor>
workUnsequenceTsFileProcessors = new TreeMap<>();
@@ -201,6 +205,12 @@ public class StorageGroupProcessor {
*/
protected Map<Long, Map<String, Long>>
partitionLatestFlushedTimeForEachDevice = new HashMap<>();
+ /*
+ * time partition id -> map, when a memory table is marked as to be flush,
use latestTimeForEachDevice
+ * to update the flushingLatestTimeForEachDevice, and is used to update
partitionLatestFlushedTimeForEachDevice
+ * when a flush is actually issued.
+ */
+ protected Map<Long, Map<String, Long>> flushingLatestTimeForEachDevice = new
HashMap<>();
/**
* used to record the latest flush time while upgrading and inserting
*/
@@ -692,6 +702,10 @@ public class StorageGroupProcessor {
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(timePartitionId, id -> new HashMap<>());
+ if (config.isEnableSlidingMemTable()) {
+ flushingLatestTimeForEachDevice.computeIfAbsent(timePartitionId, l ->
new HashMap<>());
+ }
+
boolean isSequence =
insertRowPlan.getTime() >
partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
.getOrDefault(insertRowPlan.getDeviceId().getFullPath(),
Long.MIN_VALUE);
@@ -913,7 +927,18 @@ public class StorageGroupProcessor {
return;
}
- tsFileProcessor.insert(insertRowPlan);
+ TsFileProcessor flushingProcessor =
flushingSequenceTsFileProcessors.get(timePartitionId);
+ boolean toFlushingProcessor = false;
+
+ if (config.isEnableSlidingMemTable() && sequence &&
isInsertToFlushingMemTable(timePartitionId,
+ insertRowPlan)) {
+ toFlushingProcessor = true;
+ }
+ if (toFlushingProcessor) {
+ flushingProcessor.insert(insertRowPlan);
+ } else {
+ tsFileProcessor.insert(insertRowPlan);
+ }
// try to update the latest time of the device of this tsRecord
if (latestTimeForEachDevice.get(timePartitionId)
@@ -928,12 +953,52 @@ public class StorageGroupProcessor {
tryToUpdateInsertLastCache(insertRowPlan, globalLatestFlushTime);
+ if (config.isEnableSlidingMemTable() && sequence) {
+ if (flushingProcessor == null && tsFileProcessor.shouldFlush()) {
+ flushingSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
+ Map<String, Long> curPartitionDeviceLatestTime =
latestTimeForEachDevice
+ .get(timePartitionId);
+ if (curPartitionDeviceLatestTime != null) {
+ for (Entry<String, Long> entry :
curPartitionDeviceLatestTime.entrySet()) {
+ flushingLatestTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ }
+ }
+ workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
+ // if unsequence files don't contain this time range id, we should
remove it's version controller
+ if
(!workUnsequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId()))
{
+
timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
+ }
+ } else if (flushingProcessor != null &&
tsFileProcessor.shouldCloseFlushing()) {
+ fileFlushPolicy.apply(this, flushingProcessor, sequence);
+ flushingSequenceTsFileProcessors.put(timePartitionId, null);
+ Map<String, Long> curPartitionDeviceLatestTime =
flushingLatestTimeForEachDevice
+ .get(timePartitionId);
+ if (curPartitionDeviceLatestTime != null) {
+ for (Entry<String, Long> entry :
curPartitionDeviceLatestTime.entrySet()) {
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ return;
+ }
// check memtable size and may asyncTryToFlush the work memtable
if (tsFileProcessor.shouldFlush()) {
fileFlushPolicy.apply(this, tsFileProcessor, sequence);
}
}
+ /**
+ * judge whether a insert plan should be inserted into the flushingMemtable
+ */
+ private boolean isInsertToFlushingMemTable(long timePartitionId,
InsertRowPlan insertRowPlan){
+ return flushingLatestTimeForEachDevice.get(timePartitionId).
+ getOrDefault(insertRowPlan.getDeviceId().getFullPath(),
Long.MIN_VALUE) >= insertRowPlan.getTime();
+ }
+
protected void tryToUpdateInsertLastCache(InsertRowPlan plan, Long
latestFlushedTime) {
if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
return;
@@ -1148,11 +1213,6 @@ public class StorageGroupProcessor {
updateEndTimeMap(tsFileProcessor);
tsFileProcessor.asyncClose();
- workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
- // if unsequence files don't contain this time range id, we should
remove it's version controller
- if
(!workUnsequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId()))
{
-
timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
- }
logger.info("close a sequence tsfile processor {}", storageGroupName);
} else {
closingUnSequenceTsFileProcessor.add(tsFileProcessor);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index c3ba768..4735996 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -421,7 +421,24 @@ public class TsFileProcessor {
workMemTable.memSize(),
tsFileResource.getTsFile().getAbsolutePath());
return true;
}
- if (!enableMemControl && workMemTable.memSize() >=
getMemtableSizeThresholdBasedOnSeriesNum()) {
+ if (workMemTable.memSize() >= getMemtableSizeThresholdBasedOnSeriesNum()) {
+ logger.info("The memtable size {} of tsfile {} reaches the threshold",
+ workMemTable.memSize(),
tsFileResource.getTsFile().getAbsolutePath());
+ return true;
+ }
+ return false;
+ }
+
+ public boolean shouldCloseFlushing() {
+ if (workMemTable == null) {
+ return false;
+ }
+ if (shouldFlush) {
+ logger.info("The memtable size {} of tsfile {} reaches the mem control
threshold",
+ workMemTable.memSize(),
tsFileResource.getTsFile().getAbsolutePath());
+ return true;
+ }
+ if (workMemTable.memSize() >= config.getSlidingWindowThreshold()) {
logger.info("The memtable size {} of tsfile {} reaches the threshold",
workMemTable.memSize(),
tsFileResource.getTsFile().getAbsolutePath());
return true;