This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch dev_sliding_mem_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev_sliding_mem_table by this
push:
new cc039ef finish sliding mem table version 3
cc039ef is described below
commit cc039ef01773adb604c346f613bb1905dc4f5dae
Author: EJTTianyu <[email protected]>
AuthorDate: Tue Dec 8 10:19:32 2020 +0800
finish sliding mem table version 3
---
.../java/org/apache/iotdb/tsfile/TsFileSequenceRead.java | 2 +-
.../src/assembly/resources/conf/iotdb-engine.properties | 6 +++---
.../org/apache/iotdb/db/engine/flush/FlushManager.java | 15 +++++++++------
.../db/engine/storagegroup/StorageGroupProcessor.java | 12 +++++++++---
.../iotdb/db/engine/storagegroup/TsFileProcessor.java | 4 ++--
5 files changed, 24 insertions(+), 15 deletions(-)
diff --git
a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index 4cbf862..da5bae6 100644
---
a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++
b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -41,7 +41,7 @@ public class TsFileSequenceRead {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
public static void main(String[] args) throws IOException {
- String filename =
"/Users/tianyu/2019秋季学期/iotdb/server/target/data/sequence/root.vehicle.d0/0/1607075644401-1-0.tsfile";
+ String filename = "test.tsfile";
if (args.length >= 1) {
filename = args[0];
}
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 1f296ef..6137629 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -232,9 +232,6 @@ primitive_array_size=128
# Ratio of write memory for invoking flush disk, 0.3 by default
flush_proportion=0.3
-# Ratio of write memory for invoking flush immediately, 0.6 by default
-force_flush_proportion=0.6
-
# Ratio of write memory allocated for buffered arrays, 0.6 by default
buffered_arrays_memory_proportion=0.6
@@ -267,6 +264,9 @@ enable_sliding_mem_table=true
# Save the flushing memtable in the memory during the period, can help reduce
the unseq ratio, Unit: millis.
flush_wait_time=60000
+# Ratio of write memory for invoking flush immediately, avoid OOM, 0.6 by
default
+force_flush_proportion=0.6
+
####################
### Upgrade Configurations
####################
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index b102d03..2672f47 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -92,9 +92,9 @@ public class FlushManager implements FlushManagerMBean,
IService {
if (tsFileProcessor.isSequence() &&
IoTDBDescriptor.getInstance().getConfig()
.isEnableSlidingMemTable() &&
!SystemInfo.getInstance().forceFlush()) {
long startTime = System.currentTimeMillis();
- while (!tsFileProcessor.isShouldClose()
- && System.currentTimeMillis() - startTime <
IoTDBDescriptor.getInstance().getConfig()
- .getFlushWaitTime()) {
+ while (!tsFileProcessor.isShouldClose() &&
+ System.currentTimeMillis() - startTime <
IoTDBDescriptor.getInstance().getConfig()
+ .getFlushWaitTime()) {
// wait
try {
TimeUnit.MILLISECONDS
@@ -104,9 +104,12 @@ public class FlushManager implements FlushManagerMBean,
IService {
}
}
}
- tsFileProcessor.setFlushingMemTable(null);
- tsFileProcessor.setFlushMemTableAlive(false);
- tsFileProcessor.getUpdateLatestFlushTimeCallback().call(tsFileProcessor);
+ if (tsFileProcessor.isSequence() &&
IoTDBDescriptor.getInstance().getConfig()
+ .isEnableSlidingMemTable()) {
+
tsFileProcessor.getUpdateLatestFlushTimeCallback().call(tsFileProcessor);
+ tsFileProcessor.setFlushingMemTable(null);
+ tsFileProcessor.setFlushMemTableAlive(false);
+ }
tsFileProcessor.flushOneMemTable();
tsFileProcessor.setManagedByFlushManager(false);
if (logger.isDebugEnabled()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index f3896a6..537b36a 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -881,7 +881,8 @@ public class StorageGroupProcessor {
return;
}
- if (sequence && config.isEnableSlidingMemTable()) {
+ // judge whether to insert into flushing mem table or not when flushing
mem table is alive
+ if (tsFileProcessor.isFlushMemTableAlive()) {
insertRowPlan
.setToFlushingMemTable(isInsertToFlushingMemTable(timePartitionId,
insertRowPlan));
}
@@ -1668,17 +1669,22 @@ public class StorageGroupProcessor {
}
if (processor.isFlushMemTableAlive()) {
+ // if flushing mem table is alive, use latestTimeForEachDevice to
update flushingLatestTimeForEachDevice
for (Entry<String, Long> entry :
curPartitionDeviceLatestTime.entrySet()) {
flushingLatestTimeForEachDevice
.computeIfAbsent(processor.getTimeRangeId(), id -> new
HashMap<>())
.put(entry.getKey(), entry.getValue());
}
+ // when the processor is closing, update
partitionLatestFlushedTimeForEachDevice
if (processor.isUpdateLatestTime()) {
updateLatestTime(processor, curPartitionDeviceLatestTime);
}
} else {
- curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
- .get(processor.getTimeRangeId());
+ // is sliding window is enabled, use flushingLatestTimeForEachDevice
to update
+ if (config.isEnableSlidingMemTable()) {
+ curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
+ .get(processor.getTimeRangeId());
+ }
updateLatestTime(processor, curPartitionDeviceLatestTime);
}
} finally {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index a4d0a0a..e982933 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -605,11 +605,11 @@ public class TsFileProcessor {
*/
public void asyncFlush() {
if (config.isEnableSlidingMemTable()){
- while (isManagedByFlushManager() && flushingMemTable != null) {
+ while (flushingMemTable != null) {
try {
TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
} catch (InterruptedException e) {
- e.printStackTrace();
+ logger.error("async flush failed because the flushing mem table is
still alive", e);
}
}
}