This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch dev_sliding_mem_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev_sliding_mem_table by this
push:
new 4ed7dd8 finish sliding window version 2
4ed7dd8 is described below
commit 4ed7dd87b4dbddd2b3380956f9bbdb86ffdb62a1
Author: EJTTianyu <[email protected]>
AuthorDate: Mon Dec 7 15:38:44 2020 +0800
finish sliding window version 2
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +--
.../engine/storagegroup/StorageGroupProcessor.java | 35 ++++++++++--------
.../db/engine/storagegroup/TsFileProcessor.java | 41 ++++++++++++----------
3 files changed, 45 insertions(+), 35 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a337152..13e9d3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -210,9 +210,9 @@ public class IoTDBConfig {
private int estimatedSeriesSize = 300;
/**
- * Whether to enable sliding memory table
+ * Whether to enable sliding memory table, to reduce the CI time, the
default value is false
*/
- private boolean enableSlidingMemTable = true;
+ private boolean enableSlidingMemTable = false;
/**
* Save the flushing memtable in the memory during the period, can help
reduce the unseq ratio, Unit: millis.
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b431823..f3896a6 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1673,22 +1673,13 @@ public class StorageGroupProcessor {
.computeIfAbsent(processor.getTimeRangeId(), id -> new
HashMap<>())
.put(entry.getKey(), entry.getValue());
}
- } else {
- if (processor.isSequence()) {
- curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
- .get(processor.getTimeRangeId());
- }
- for (Entry<String, Long> entry :
curPartitionDeviceLatestTime.entrySet()) {
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(processor.getTimeRangeId(), id -> new
HashMap<>())
- .put(entry.getKey(), entry.getValue());
-
updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
- entry.getKey(), entry.getValue());
- if (globalLatestFlushedTimeForEachDevice
- .getOrDefault(entry.getKey(), Long.MIN_VALUE) <
entry.getValue()) {
- globalLatestFlushedTimeForEachDevice.put(entry.getKey(),
entry.getValue());
- }
+ if (processor.isUpdateLatestTime()) {
+ updateLatestTime(processor, curPartitionDeviceLatestTime);
}
+ } else {
+ curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
+ .get(processor.getTimeRangeId());
+ updateLatestTime(processor, curPartitionDeviceLatestTime);
}
} finally {
flushUpdateUnLock();
@@ -1696,6 +1687,20 @@ public class StorageGroupProcessor {
return true;
}
+ private void updateLatestTime(TsFileProcessor processor, Map<String, Long>
curPartitionDeviceLatestTime){
+ for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+
updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
+ entry.getKey(), entry.getValue());
+ if (globalLatestFlushedTimeForEachDevice
+ .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
+ globalLatestFlushedTimeForEachDevice.put(entry.getKey(),
entry.getValue());
+ }
+ }
+ }
+
/**
* used for upgrading
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 32dec07..a4d0a0a 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -111,6 +111,7 @@ public class TsFileProcessor {
private IMemTable workMemTable;
private boolean isFlushMemTableAlive = false;
+ private boolean updateLatestTime = false;
private final VersionController versionController;
@@ -530,9 +531,6 @@ public class TsFileProcessor {
// we have to add the memtable into flushingList first and then set the
shouldClose tag.
// see https://issues.apache.org/jira/browse/IOTDB-510
-// IMemTable tmpFlushMemTable = flushingMemTable == null ||
flushingMemTable.memSize() == 0
-// ? new NotifyFlushMemTable()
-// : flushingMemTable;
IMemTable tmpWorkMemTable = workMemTable == null ||
workMemTable.memSize() == 0
? new NotifyFlushMemTable()
: workMemTable;
@@ -540,8 +538,7 @@ public class TsFileProcessor {
try {
// When invoke closing TsFile after insert data to memTable, we
shouldn't flush until invoke
// flushing memTable in System module.
-// addAMemtableIntoFlushingList(tmpFlushMemTable);
- addAMemtableIntoFlushingList(tmpWorkMemTable);
+ addAMemtableIntoFlushingList(tmpWorkMemTable, true);
shouldClose = true;
tsFileResource.setCloseFlag();
} catch (Exception e) {
@@ -569,16 +566,12 @@ public class TsFileProcessor {
.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName,
tsFileResource.getTsFile().getName());
}
try {
- if (flushingMemTable == null) {
- tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() :
workMemTable;
- } else {
- tmpMemTable = flushingMemTable;
- }
+ tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() :
workMemTable;
if (logger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) {
logger.debug("{}: {} add a signal memtable into flushing memtable list
when sync flush",
storageGroupName, tsFileResource.getTsFile().getName());
}
- addAMemtableIntoFlushingList(tmpMemTable);
+ addAMemtableIntoFlushingList(tmpMemTable, false);
} finally {
flushQueryLock.writeLock().unlock();
if (logger.isDebugEnabled()) {
@@ -611,6 +604,15 @@ public class TsFileProcessor {
* put the working memtable into flushing list and set the working memtable
to null
*/
public void asyncFlush() {
+ if (config.isEnableSlidingMemTable()){
+ while (isManagedByFlushManager() && flushingMemTable != null) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
logger
@@ -622,12 +624,8 @@ public class TsFileProcessor {
}
logger.info("Async flush a memtable to tsfile: {}",
tsFileResource.getTsFile().getAbsolutePath());
- if (config.isEnableSlidingMemTable()){
- while (flushingMemTable != null) {
- TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
- }
- }
- addAMemtableIntoFlushingList(workMemTable);
+
+ addAMemtableIntoFlushingList(workMemTable, false);
} catch (Exception e) {
logger.error("{}: {} add a memtable into flushing list failed",
storageGroupName,
tsFileResource.getTsFile().getName(), e);
@@ -645,7 +643,7 @@ public class TsFileProcessor {
* queue, set the current working memtable as null and then register the
tsfileProcessor into the
* flushManager again.
*/
- private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws
IOException {
+ private void addAMemtableIntoFlushingList(IMemTable tobeFlushed, boolean
isClosing) throws IOException {
if (!tobeFlushed.isSignalMemTable() && tobeFlushed.memSize() == 0) {
logger.warn("This normal memtable is empty, skip it in flush. {}: {}
Memetable info: {}",
storageGroupName, tsFileResource.getTsFile().getName(),
tobeFlushed.getMemTableMap());
@@ -673,6 +671,9 @@ public class TsFileProcessor {
flushingMemTable = workMemTable;
isFlushMemTableAlive = true;
}
+ if(isClosing){
+ updateLatestTime = true;
+ }
updateLatestFlushTimeCallback.call(this);
workMemTable = null;
shouldFlush = false;
@@ -1049,4 +1050,8 @@ public class TsFileProcessor {
public boolean isShouldClose() {
return shouldClose;
}
+
+ public boolean isUpdateLatestTime() {
+ return updateLatestTime;
+ }
}