This is an automated email from the ASF dual-hosted git repository.
spricoder pushed a commit to branch research/deferredflushing
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/research/deferredflushing by
this push:
new c10e124903a [Deferred Flushing] Implementation of deferred flushing
strategy. (#14724)
c10e124903a is described below
commit c10e124903a2ff38ff13770d8b9d93ec7305fade
Author: DeferredFlushing <[email protected]>
AuthorDate: Sat Jan 18 19:20:32 2025 +0800
[Deferred Flushing] Implementation of deferred flushing strategy. (#14724)
Co-authored-by: DeferredFlushing <>
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +
.../db/storageengine/dataregion/DataRegion.java | 8 +-
.../dataregion/flush/MemTableFlushTask.java | 25 ++++
.../dataregion/memtable/AbstractMemTable.java | 71 ++++++++++-
.../memtable/AlignedWritableMemChunk.java | 14 +++
.../memtable/AlignedWritableMemChunkGroup.java | 13 ++
.../dataregion/memtable/IMemTable.java | 9 ++
.../dataregion/memtable/IWritableMemChunk.java | 9 ++
.../memtable/IWritableMemChunkGroup.java | 5 +
.../dataregion/memtable/PrimitiveMemTable.java | 8 ++
.../dataregion/memtable/TsFileProcessor.java | 77 ++++++++++--
.../dataregion/memtable/WritableMemChunk.java | 24 +++-
.../dataregion/memtable/WritableMemChunkGroup.java | 20 +++
.../rescon/memory/PrimitiveArrayManager.java | 1 +
.../db/utils/datastructure/AlignedTVList.java | 36 +++++-
.../iotdb/db/utils/datastructure/BackwardSort.java | 4 +-
.../iotdb/db/utils/datastructure/IntTVList.java | 138 +++++++++++++--------
.../db/utils/datastructure/QuickAlignedTVList.java | 7 +-
.../db/utils/datastructure/QuickBinaryTVList.java | 7 +-
.../db/utils/datastructure/QuickBooleanTVList.java | 7 +-
.../db/utils/datastructure/QuickDoubleTVList.java | 7 +-
.../db/utils/datastructure/QuickFloatTVList.java | 7 +-
.../db/utils/datastructure/QuickIntTVList.java | 7 +-
.../db/utils/datastructure/QuickLongTVList.java | 7 +-
.../iotdb/db/utils/datastructure/QuickSort.java | 6 +-
.../iotdb/db/utils/datastructure/TVList.java | 16 +++
.../db/utils/datastructure/TimAlignedTVList.java | 9 +-
.../db/utils/datastructure/TimBinaryTVList.java | 9 +-
.../db/utils/datastructure/TimBooleanTVList.java | 9 +-
.../db/utils/datastructure/TimDoubleTVList.java | 9 +-
.../db/utils/datastructure/TimFloatTVList.java | 9 +-
.../iotdb/db/utils/datastructure/TimIntTVList.java | 20 +--
.../db/utils/datastructure/TimLongTVList.java | 9 +-
.../iotdb/db/utils/datastructure/TimSort.java | 10 +-
.../TopkDivideMemoryNotEnoughException.java | 8 ++
.../resources/conf/iotdb-common.properties | 17 ++-
37 files changed, 547 insertions(+), 111 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a9ec1ee5f5b..f3d0c7b611d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -410,6 +410,9 @@ public class IoTDBConfig {
/** The sort algorithm used in TVList */
private TVListSortAlgorithm tvListSortAlgorithm = TVListSortAlgorithm.TIM;
+ /** The value of primitive topk area size in the tvlist */
+ private int seqMemtableTopKSize = 0;
+
/** When average series point number reaches this, flush the memtable to
disk */
private int avgSeriesPointNumberThreshold = 100000;
@@ -2070,6 +2073,14 @@ public class IoTDBConfig {
this.tvListSortAlgorithm = tvListSortAlgorithm;
}
+ public int getSeqMemtableTopKSize() {
+ return seqMemtableTopKSize;
+ }
+
+ public void setSeqMemtableTopKSize(int seqMemtableTopKSize) {
+ this.seqMemtableTopKSize = seqMemtableTopKSize;
+ }
+
public int getAvgSeriesPointNumberThreshold() {
return avgSeriesPointNumberThreshold;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 9160cb8b0b1..2b728ca5865 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -387,6 +387,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"tvlist_sort_algorithm",
conf.getTvListSortAlgorithm().toString())));
+ conf.setSeqMemtableTopKSize(
+ (Integer.parseInt(
+ properties.getProperty(
+ "seq_memtable_topk_size",
Integer.toString(conf.getSeqMemtableTopKSize())))));
+
conf.setAvgSeriesPointNumberThreshold(
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 20cbfef2aee..eff6ddb30b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1391,7 +1391,6 @@ public class DataRegion implements IDataRegionForQuery {
version,
0,
0);
-
return getTsFileProcessor(sequence, filePath, timePartitionId);
}
@@ -1503,6 +1502,7 @@ public class DataRegion implements IDataRegionForQuery {
if
(!workSequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) {
timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
}
+ logger.info("close an unsequence tsfile processor {}", databaseName +
"-" + dataRegionId);
}
if (workSequenceTsFileProcessors.get(tsFileProcessor.getTimeRangeId()) ==
null
&&
workUnsequenceTsFileProcessors.get(tsFileProcessor.getTimeRangeId()) == null) {
@@ -1663,7 +1663,8 @@ public class DataRegion implements IDataRegionForQuery {
for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
if (tsFileProcessor.getWorkMemTableUpdateTime() < timeLowerBound) {
logger.info(
- "Exceed sequence memtable flush interval, so flush working
memtable of time partition {} in database {}[{}]",
+ "Exceed sequence memtable {} flush interval, so flush working
memtable of time partition {} in database {}[{}]",
+ tsFileProcessor.getWorkMemTable().getMemTableId(),
tsFileProcessor.getTimeRangeId(),
databaseName,
dataRegionId);
@@ -1689,7 +1690,8 @@ public class DataRegion implements IDataRegionForQuery {
for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
if (tsFileProcessor.getWorkMemTableUpdateTime() < timeLowerBound) {
logger.info(
- "Exceed unsequence memtable flush interval, so flush working
memtable of time partition {} in database {}[{}]",
+ "Exceed unsequence memtable {} flush interval, so flush working
memtable of time partition {} in database {}[{}]",
+ tsFileProcessor.getWorkMemTable().getMemTableId(),
tsFileProcessor.getTimeRangeId(),
databaseName,
dataRegionId);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index ab5440d5358..9becb3c560c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -110,6 +110,13 @@ public class MemTableFlushTask {
memTable.getSeriesNumber() == 0
? 0
: memTable.getTotalPointsNum() / memTable.getSeriesNumber();
+ LOGGER.info(
+ "The memTable {} size of SG {} is {}, the avg series points num in
chunk is {}, total timeseries number is {}",
+ memTable.getMemTableId(),
+ storageGroup,
+ memTable.memSize(),
+ avgSeriesPointsNum,
+ memTable.getSeriesNumber());
WRITING_METRICS.recordFlushingMemTableStatus(
storageGroup,
memTable.memSize(),
@@ -205,6 +212,13 @@ public class MemTableFlushTask {
MetricLevel.CORE,
Tag.NAME.toString(),
"flush");
+
+ LOGGER.info(
+ "Database {} flushing memtable {} with {} has finished! Time
consumption: {}ms",
+ storageGroup,
+ memTable.getMemTableId(),
+ memTable,
+ System.currentTimeMillis() - start);
}
/** encoding task (second task of pipeline) */
@@ -274,6 +288,17 @@ public class MemTableFlushTask {
databaseName ->
recordFlushPointsMetricInternal(
memTable.getTotalPointsNum(), databaseName,
dataRegionId));
+
+ LOGGER.info(
+ "Database {}, flushing memtable {} with size {} and points {} in
file "
+ + "{} into disk: Encoding data cost "
+ + "{} ms.",
+ storageGroup,
+ memTable.getMemTableId(),
+ memTable.memSize(),
+ memTable.getTotalPointsNum(),
+ writer.getFile().getName(),
+ memSerializeTime);
WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_ENCODING,
memSerializeTime);
}
};
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index e54e7ef2c36..0bdebb21481 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.utils.MemUtils;
+import
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -88,9 +89,9 @@ public abstract class AbstractMemTable implements IMemTable {
private long minPlanIndex = Long.MAX_VALUE;
- private final long memTableId = memTableIdCounter.incrementAndGet();
+ private long memTableId = memTableIdCounter.incrementAndGet();
- private final long createdTime = System.currentTimeMillis();
+ private long createdTime = System.currentTimeMillis();
/** this time is updated by the timed flush, same as createdTime when the
feature is disabled. */
private long updateTime = createdTime;
@@ -126,6 +127,34 @@ public abstract class AbstractMemTable implements
IMemTable {
this.memTableMap = memTableMap;
}
+ /** create memtable from last memtable, preserve topk points */
+ protected AbstractMemTable(
+ String database,
+ String dataRegionId,
+ Map<IDeviceID, IWritableMemChunkGroup> memTableMap,
+ AbstractMemTable lastMemTable) {
+ this.database = database;
+ this.dataRegionId = dataRegionId;
+ this.memTableMap = memTableMap;
+ for (IDeviceID key : memTableMap.keySet()) {
+ Map<String, IWritableMemChunk> memChunkMap =
memTableMap.get(key).getMemChunkMap();
+ for (String schema : memChunkMap.keySet()) {
+ IWritableMemChunk memChunk = memChunkMap.get(schema);
+ totalPointsNum += memChunk.getTVList().rowCount();
+ memSize +=
+ (long) memChunk.getTVList().rowCount()
+ * memChunk.getSchema().getType().getDataTypeSize();
+ }
+ }
+ totalPointsNumThreshold = lastMemTable.getTotalPointsNumThreshold();
+ createdTime = System.currentTimeMillis();
+ memTableId = memTableIdCounter.incrementAndGet();
+ seriesNumber = lastMemTable.getSeriesNumber();
+ tvListRamCost = lastMemTable.getTVListsRamCost() * totalPointsNum /
lastMemTable.totalPointsNum;
+ shouldFlush = false;
+ flushStatus = FlushStatus.WORKING;
+ }
+
@Override
public Map<IDeviceID, IWritableMemChunkGroup> getMemTableMap() {
return memTableMap;
@@ -439,6 +468,16 @@ public abstract class AbstractMemTable implements
IMemTable {
return memSize;
}
+ @Override
+ public void setMemSize(long m) {
+ memSize = memSize + m;
+ }
+
+ @Override
+ public void setTotalPointsNum(long m) {
+ totalPointsNum = totalPointsNum + m;
+ }
+
@Override
public boolean reachTotalPointNumThreshold() {
if (totalPointsNum == 0) {
@@ -529,6 +568,21 @@ public abstract class AbstractMemTable implements
IMemTable {
}
}
+ @Override
+ public IMemTable divide() throws TopkDivideMemoryNotEnoughException {
+ Map<IDeviceID, IWritableMemChunkGroup> topkMemTableMap = new HashMap<>();
+ for (IDeviceID key : memTableMap.keySet()) {
+ IWritableMemChunkGroup chunkGroupTemp = memTableMap.get(key).divide();
+ topkMemTableMap.put(key, chunkGroupTemp);
+ }
+ PrimitiveMemTable topkMemtable =
+ new PrimitiveMemTable(database, dataRegionId, topkMemTableMap, this);
+ memSize -= topkMemtable.memSize();
+ totalPointsNum -= topkMemtable.getTotalPointsNum();
+ tvListRamCost -= topkMemtable.getTVListsRamCost();
+ return topkMemtable;
+ }
+
@Override
public void addTVListRamCost(long cost) {
this.tvListRamCost += cost;
@@ -689,6 +743,15 @@ public abstract class AbstractMemTable implements
IMemTable {
return latestTimeForEachDevice;
}
+ @Override
+ public Map<String, Long> getTopKTime() {
+ Map<String, Long> latestTimeForEachDevice = new HashMap<>();
+ for (Entry<IDeviceID, IWritableMemChunkGroup> entry :
memTableMap.entrySet()) {
+ latestTimeForEachDevice.put(entry.getKey().toStringID(),
entry.getValue().getTopKTime());
+ }
+ return latestTimeForEachDevice;
+ }
+
public static class Factory {
private Factory() {
// Empty constructor
@@ -734,4 +797,8 @@ public abstract class AbstractMemTable implements IMemTable
{
public boolean isTotallyGeneratedByPipe() {
return this.isTotallyGeneratedByPipe.get();
}
+
+ public long getTotalPointsNumThreshold() {
+ return totalPointsNumThreshold;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 64a56ee3ac5..55ad4849a1b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;
+import
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -253,11 +254,19 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
return null;
}
+ @Override
+ public void setSchema(IMeasurementSchema s) {}
+
@Override
public long getMaxTime() {
return list.getMaxTime();
}
+ @Override
+ public long getTopKTime() {
+ return list.getTopKTime();
+ }
+
@Override
public synchronized TVList getSortedTvListForQuery() {
sortTVList();
@@ -495,6 +504,11 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
.getTimestamp();
}
+ @Override
+ public IWritableMemChunk divide() throws TopkDivideMemoryNotEnoughException {
+ return new AlignedWritableMemChunk(schemaList, (AlignedTVList)
list.divide());
+ }
+
@Override
public boolean isEmpty() {
return list.rowCount() == 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
index 17b12a748da..875c1130692 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternUtil;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -143,6 +144,18 @@ public class AlignedWritableMemChunkGroup implements
IWritableMemChunkGroup {
return memChunk.getMaxTime();
}
+ @Override
+ public long getTopKTime() {
+ return memChunk.getTopKTime();
+ }
+
+ @Override
+ public IWritableMemChunkGroup divide() throws
TopkDivideMemoryNotEnoughException {
+ AlignedWritableMemChunkGroup topkMemChunkGroup = new
AlignedWritableMemChunkGroup();
+ topkMemChunkGroup.memChunk = (AlignedWritableMemChunk) memChunk.divide();
+ return topkMemChunkGroup;
+ }
+
public AlignedWritableMemChunk getAlignedMemChunk() {
return memChunk;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index 8e1a883d2a4..fc9a5424df4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTablet
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+import
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -65,6 +66,10 @@ public interface IMemTable extends WALEntryValue {
/** @return memory usage */
long memSize();
+ void setMemSize(long m);
+
+ void setTotalPointsNum(long m);
+
/** only used when mem control enabled */
void addTVListRamCost(long cost);
@@ -132,6 +137,8 @@ public interface IMemTable extends WALEntryValue {
*/
void delete(PartialPath path, PartialPath devicePath, long startTimestamp,
long endTimestamp);
+ /** divide the memtable into the topk and regular ones @ return the topk
memtable. */
+ IMemTable divide() throws TopkDivideMemoryNotEnoughException;
/**
* Make a copy of this MemTable.
*
@@ -185,4 +192,6 @@ public interface IMemTable extends WALEntryValue {
void markAsNotGeneratedByPipe();
boolean isTotallyGeneratedByPipe();
+
+ Map<String, Long> getTopKTime();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 4c07c3ece33..4b643cb35d1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.utils.datastructure.TVList;
+import
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
@@ -83,6 +84,8 @@ public interface IWritableMemChunk extends WALEntryValue {
IMeasurementSchema getSchema();
+ void setSchema(IMeasurementSchema s);
+
/**
* served for read requests.
*
@@ -128,6 +131,10 @@ public interface IWritableMemChunk extends WALEntryValue {
return Long.MAX_VALUE;
}
+ default long getTopKTime() {
+ return Long.MAX_VALUE;
+ }
+
/** @return how many points are deleted */
int delete(long lowerBound, long upperBound);
@@ -141,5 +148,7 @@ public interface IWritableMemChunk extends WALEntryValue {
long getLastPoint();
+ IWritableMemChunk divide() throws TopkDivideMemoryNotEnoughException;
+
boolean isEmpty();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
index ecaf8b2b738..eb3c1f0854d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+import
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -53,5 +54,9 @@ public interface IWritableMemChunkGroup extends WALEntryValue
{
long getCurrentTVListSize(String measurement);
+ IWritableMemChunkGroup divide() throws TopkDivideMemoryNotEnoughException;
+
long getMaxTime();
+
+ long getTopKTime();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
index 70b0f909381..cfdd9c256c1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
@@ -37,6 +37,14 @@ public class PrimitiveMemTable extends AbstractMemTable {
super(database, dataRegionId, memTableMap);
}
+ public PrimitiveMemTable(
+ String database,
+ String dataRegionId,
+ Map<IDeviceID, IWritableMemChunkGroup> memTableMap,
+ AbstractMemTable lastMemTable) {
+ super(database, dataRegionId, memTableMap, lastMemTable);
+ }
+
@Override
public IMemTable copy() {
Map<IDeviceID, IWritableMemChunkGroup> newMap = new
HashMap<>(getMemTableMap());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 953606ed136..fd8331b7cbe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -62,6 +62,7 @@ import
org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;
+import
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -93,6 +94,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.GET_QUERY_RESOURCE_FROM_MEM;
import static
org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.FLUSHING_MEMTABLE;
import static
org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.WORKING_MEMTABLE;
+import static
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.MEMTABLE_TOPK_SIZE;
@SuppressWarnings("java:S1135") // ignore todos
public class TsFileProcessor {
@@ -180,6 +182,8 @@ public class TsFileProcessor {
private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS
=
PerformanceOverviewMetrics.getInstance();
+ private File tsfileTemp;
+
@SuppressWarnings("squid:S107")
public TsFileProcessor(
String storageGroupName,
@@ -324,6 +328,39 @@ public class TsFileProcessor {
dataRegionInfo.getDataRegion().getDatabaseName(),
dataRegionInfo.getDataRegion().getDataRegionId());
walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath());
+ logger.info(
+ "Succeed to create {} memTable {}",
+ sequence ? "Sequence" : "Unsequence",
+ workMemTable.getMemTableId());
+ }
+
+ private void createNewWorkingMemTable(IMemTable tobeFlushed) throws
WriteProcessException {
+ if (MEMTABLE_TOPK_SIZE != 0) {
+ try {
+ logger.info(
+ "Try to divide memTable {} with size {} and points {}",
+ tobeFlushed.getMemTableId(),
+ tobeFlushed.memSize(),
+ tobeFlushed.getTotalPointsNum());
+ workMemTable = tobeFlushed.divide();
+ walNode.onMemTableCreated(workMemTable,
tsFileResource.getTsFilePath());
+ logger.info(
+ "Succeed to divide {} memTable {} with size {} and points {} to "
+ + "new workMemTable {} with size {} and points {} to",
+ sequence ? "Sequence" : "Unsequence",
+ tobeFlushed.getMemTableId(),
+ tobeFlushed.memSize(),
+ tobeFlushed.getTotalPointsNum(),
+ workMemTable.getMemTableId(),
+ workMemTable.memSize(),
+ workMemTable.getTotalPointsNum());
+ } catch (TopkDivideMemoryNotEnoughException e) {
+ logger.warn(e.getMessage());
+ createNewWorkingMemTable();
+ }
+ } else {
+ createNewWorkingMemTable();
+ }
}
/**
@@ -854,18 +891,19 @@ public class TsFileProcessor {
try {
if (logger.isDebugEnabled()) {
if (workMemTable != null) {
- logger.debug(
- "{}: flush a working memtable in async close tsfile {}, memtable
size: {}, tsfile "
- + "size: {}, plan index: [{}, {}], progress index: {}",
+ logger.info(
+ "{}: flush working memtable {} with size: {}, tsfile "
+ + "size: {}, plan index: [{}, {}], progress index: {} in
async close tsfile {}",
storageGroupName,
- tsFileResource.getTsFile().getAbsolutePath(),
+ workMemTable.getMemTableId(),
workMemTable.memSize(),
tsFileResource.getTsFileSize(),
workMemTable.getMinPlanIndex(),
workMemTable.getMaxPlanIndex(),
- tsFileResource.getMaxProgressIndex());
+ tsFileResource.getMaxProgressIndex(),
+ tsFileResource.getTsFile().getAbsolutePath());
} else {
- logger.debug(
+ logger.info(
"{}: flush a NotifyFlushMemTable in async close tsfile {},
tsfile size: {}",
storageGroupName,
tsFileResource.getTsFile().getAbsolutePath(),
@@ -899,6 +937,12 @@ public class TsFileProcessor {
// When invoke closing TsFile after insert data to memTable, we
shouldn't flush until invoke
// flushing memTable in System module.
Future<?> future = addAMemtableIntoFlushingList(tmpMemTable);
+ logger.info(
+ "Memtable {}: {} with size {} and points {} has been added to
flushing list",
+ tmpMemTable.getMemTableId(),
+ tmpMemTable,
+ tmpMemTable.memSize(),
+ tmpMemTable.getTotalPointsNum());
shouldClose = true;
return future;
} catch (Exception e) {
@@ -982,7 +1026,9 @@ public class TsFileProcessor {
return;
}
logger.info(
- "Async flush a memtable to tsfile: {}",
tsFileResource.getTsFile().getAbsolutePath());
+ "Async flush a memtable {} to tsfile: {}",
+ workMemTable.getMemTableId(),
+ tsFileResource.getTsFile().getAbsolutePath());
addAMemtableIntoFlushingList(workMemTable);
} catch (Exception e) {
logger.error(
@@ -1007,7 +1053,9 @@ public class TsFileProcessor {
private Future<?> addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws
IOException {
Map<String, Long> lastTimeForEachDevice = new HashMap<>();
if (sequence) {
- lastTimeForEachDevice = tobeFlushed.getMaxTime();
+ // TODO:修改lastFlushTime的取法
+ lastTimeForEachDevice = tobeFlushed.getTopKTime();
+ // lastTimeForEachDevice = tobeFlushed.getMaxTime();
// If some devices have been removed in MemTable, the number of device
in MemTable and
// tsFileResource will not be the same. And the endTime of these devices
in resource will be
// Long.minValue.
@@ -1027,7 +1075,18 @@ public class TsFileProcessor {
updateLatestFlushTimeCallback.call(this, lastTimeForEachDevice,
lastWorkMemtableFlushTime);
SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost());
- flushingMemTables.addLast(tobeFlushed);
+ // TODO:如果是顺序区并且k>0,拆分之后分别进入不同的waitinglist中,否则的话仍然按照原方式刷盘
+ if (MEMTABLE_TOPK_SIZE == 0) {
+ flushingMemTables.addLast(tobeFlushed);
+ workMemTable = null;
+ } else {
+ try {
+ createNewWorkingMemTable(tobeFlushed);
+ } catch (WriteProcessException e) {
+ throw new RuntimeException(e);
+ }
+ flushingMemTables.addLast(tobeFlushed);
+ }
if (logger.isDebugEnabled()) {
logger.debug(
"{}: {} Memtable (signal = {}) is added into the flushing Memtable,
queue size = {}",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index c1a71d32a53..c750fb6db69 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
+import
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -55,7 +56,13 @@ public class WritableMemChunk implements IWritableMemChunk {
this.list = TVList.newList(schema.getType());
}
- private WritableMemChunk() {}
+ public WritableMemChunk(IMeasurementSchema schema, TVList ls) {
+ this.schema = schema;
+ this.list = ls;
+ }
+
+ // private WritableMemChunk() {}
+ public WritableMemChunk() {}
@Override
public boolean writeWithFlushCheck(long insertTime, Object objectValue) {
@@ -251,11 +258,21 @@ public class WritableMemChunk implements
IWritableMemChunk {
return schema;
}
+ @Override
+ public void setSchema(IMeasurementSchema s) {
+ this.schema = s;
+ }
+
@Override
public long getMaxTime() {
return list.getMaxTime();
}
+ @Override
+ public long getTopKTime() {
+ return list.getTopKTime();
+ }
+
@Override
public long getFirstPoint() {
if (list.rowCount() == 0) {
@@ -274,6 +291,11 @@ public class WritableMemChunk implements IWritableMemChunk
{
.getTimestamp();
}
+ @Override
+ public IWritableMemChunk divide() throws TopkDivideMemoryNotEnoughException {
+ return new WritableMemChunk(this.schema, list.divide());
+ }
+
@Override
public boolean isEmpty() {
return list.rowCount() == 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
index 707eace7fa5..dbe03f14c02 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternUtil;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
+import
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -172,6 +173,25 @@ public class WritableMemChunkGroup implements
IWritableMemChunkGroup {
return maxTime;
}
+ @Override
+ public long getTopKTime() {
+ long maxTime = Long.MIN_VALUE;
+ for (IWritableMemChunk memChunk : memChunkMap.values()) {
+ maxTime = Math.max(maxTime, memChunk.getTopKTime());
+ }
+ return maxTime;
+ }
+
+ @Override
+ public IWritableMemChunkGroup divide() throws
TopkDivideMemoryNotEnoughException {
+ WritableMemChunkGroup topkMemChunkGroup = new WritableMemChunkGroup();
+ for (String key : memChunkMap.keySet()) {
+ IWritableMemChunk topkMemChunk = memChunkMap.get(key).divide();
+ topkMemChunkGroup.memChunkMap.put(key, topkMemChunk);
+ }
+ return topkMemChunkGroup;
+ }
+
@Override
public int serializedSize() {
int size = 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
index 33264fc46c1..4f1edd83ca1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
@@ -40,6 +40,7 @@ public class PrimitiveArrayManager {
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
public static final int ARRAY_SIZE = CONFIG.getPrimitiveArraySize();
+ public static final int MEMTABLE_TOPK_SIZE = CONFIG.getSeqMemtableTopKSize();
public static final TVListSortAlgorithm TVLIST_SORT_ALGORITHM =
CONFIG.getTvListSortAlgorithm();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 4092db7d442..ee451ac026f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -43,8 +43,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import static
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
-import static
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM;
+import static
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.*;
import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
import static
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
import static
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
@@ -647,6 +646,37 @@ public abstract class AlignedTVList extends TVList {
}
}
+ @Override
+ public TVList divide() throws TopkDivideMemoryNotEnoughException {
+ if (MEMTABLE_TOPK_SIZE > rowCount || MEMTABLE_TOPK_SIZE < ARRAY_SIZE) {
+ throw new TopkDivideMemoryNotEnoughException(
+ String.format(
+ "WARNING: MEMTABLE_TOPK_SIZE is bigger than%d "
+ + "the TVList's row count %d"
+ + "or is smaller than ARRAY_SIZE %d",
+ rowCount, MEMTABLE_TOPK_SIZE, ARRAY_SIZE));
+ }
+ AlignedTVList topkTVList = AlignedTVList.newAlignedList(dataTypes);
+ int truncatedIndex = (rowCount - MEMTABLE_TOPK_SIZE + ARRAY_SIZE - 1) /
ARRAY_SIZE;
+
+ for (int i = truncatedIndex + 1; i <= rowCount / ARRAY_SIZE; i++) {
+ topkTVList.timestamps.add(timestamps.get(i));
+ for (int colIndex = 0; colIndex < values.size(); colIndex++) {
+ topkTVList.values.get(colIndex).add(values.get(colIndex).get(i));
+ topkTVList.bitMaps.get(colIndex).add(bitMaps.get(colIndex).get(i));
+ }
+ }
+ for (int i = rowCount / ARRAY_SIZE; i >= truncatedIndex + 1; i--) {
+ timestamps.remove(timestamps.size() - 1);
+ for (int colIndex = 0; colIndex < values.size(); colIndex++) {
+ topkTVList.values.get(colIndex).remove(values.get(colIndex).size() -
1);
+ topkTVList.bitMaps.get(colIndex).remove(bitMaps.get(colIndex).size() -
1);
+ }
+ }
+ rowCount = truncatedIndex * ARRAY_SIZE;
+ topKTime = timestamps.get(truncatedIndex - 1)[ARRAY_SIZE - 1];
+ return topkTVList;
+ }
/**
* Get the row index value in index column.
*
@@ -749,6 +779,8 @@ public abstract class AlignedTVList extends TVList {
checkExpansion();
}
}
+ sort(Math.max(0, start - MEMTABLE_TOPK_SIZE), end);
+ topKTime = Math.max(topKTime, getTime(Math.max(0, end -
MEMTABLE_TOPK_SIZE)));
}
private void arrayCopy(Object[] value, int idx, int arrayIndex, int
elementIndex, int remaining) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java
index 6271a31734e..21783bc0370 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java
@@ -53,7 +53,7 @@ public interface BackwardSort extends QuickSort {
/**
* check block-inversions to find the proper block_size, which is a multiple
of array_size. For
- * totally ordered, the block_size will equals to array_size For totally
reverse ordered, the
+ * totally ordered, the block_size will equal to array_size For totally
reverse ordered, the
* block_size will equals to the rowCount. INVERSION_RATIOS_THRESHOLD=0.005
is a empiric value.
*
* @param timestamps
@@ -132,6 +132,6 @@ public interface BackwardSort extends QuickSort {
* @param hi
*/
default void sortBlock(int lo, int hi) {
- qsort(lo, hi);
+ qSort(lo, hi);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index c64dbc94fd8..a59fa4f0342 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -34,8 +34,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import static
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
-import static
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM;
+import static
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.*;
public abstract class IntTVList extends TVList {
// list of primitive array, add 1 when expanded -> int primitive array
@@ -77,14 +76,59 @@ public abstract class IntTVList extends TVList {
@Override
public void putInt(long timestamp, int value) {
checkExpansion();
- int arrayIndex = rowCount / ARRAY_SIZE;
- int elementIndex = rowCount % ARRAY_SIZE;
- maxTime = Math.max(maxTime, timestamp);
- timestamps.get(arrayIndex)[elementIndex] = timestamp;
- values.get(arrayIndex)[elementIndex] = value;
- rowCount++;
- if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) {
- sorted = false;
+ if (MEMTABLE_TOPK_SIZE == 0) {
+ int arrayIndex = rowCount / ARRAY_SIZE;
+ int elementIndex = rowCount % ARRAY_SIZE;
+ maxTime = Math.max(maxTime, timestamp);
+ topKTime = Math.max(topKTime, timestamp);
+ timestamps.get(arrayIndex)[elementIndex] = timestamp;
+ values.get(arrayIndex)[elementIndex] = value;
+ rowCount++;
+ if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) {
+ sorted = false;
+ }
+ } else {
+ int arrayIndex;
+ int elementIndex;
+ int waitlen = Math.min(rowCount, MEMTABLE_TOPK_SIZE);
+ int tempRowCount = rowCount - 1; // 记录向前比较的截止位置
+ while (waitlen > 0) {
+ waitlen--;
+ arrayIndex = tempRowCount / ARRAY_SIZE;
+ elementIndex = tempRowCount % ARRAY_SIZE;
+ if (timestamps.get(arrayIndex)[elementIndex] > timestamp) {
+ tempRowCount--;
+ continue;
+ }
+ break;
+ }
+ arrayIndex = rowCount / ARRAY_SIZE;
+ elementIndex = rowCount % ARRAY_SIZE;
+ int arrayIndexLeft = (rowCount - 1) / ARRAY_SIZE;
+ int elementIndexLeft = (rowCount - 1) % ARRAY_SIZE;
+ for (int i = rowCount - 1; i > tempRowCount; i--) {
+ timestamps.get(arrayIndex)[elementIndex] =
timestamps.get(arrayIndexLeft)[elementIndexLeft];
+ values.get(arrayIndex)[elementIndex] =
values.get(arrayIndexLeft)[elementIndexLeft];
+ arrayIndex = arrayIndexLeft;
+ elementIndex = elementIndexLeft;
+ arrayIndexLeft = (i - 1) / ARRAY_SIZE;
+ elementIndexLeft = (i - 1) % ARRAY_SIZE;
+ sortCount++;
+ }
+ arrayIndex = (tempRowCount + 1) / ARRAY_SIZE;
+ elementIndex = (tempRowCount + 1) % ARRAY_SIZE;
+ maxTime = Math.max(maxTime, timestamp);
+ timestamps.get(arrayIndex)[elementIndex] = timestamp;
+ values.get(arrayIndex)[elementIndex] = value;
+ if (rowCount > MEMTABLE_TOPK_SIZE) {
+ arrayIndex = (rowCount - MEMTABLE_TOPK_SIZE - 1) / ARRAY_SIZE;
+ elementIndex = (rowCount - MEMTABLE_TOPK_SIZE - 1) % ARRAY_SIZE;
+ topKTime = Math.max(topKTime,
timestamps.get(arrayIndex)[elementIndex]);
+ }
+ rowCount++;
+ if (sorted && rowCount > 1 && waitlen == 0) {
+ sorted = false;
+ }
}
}
@@ -123,6 +167,37 @@ public abstract class IntTVList extends TVList {
values.add((int[]) getPrimitiveArraysByType(TSDataType.INT32));
}
+ @Override
+ public TVList divide() throws TopkDivideMemoryNotEnoughException {
+ if (MEMTABLE_TOPK_SIZE > rowCount || MEMTABLE_TOPK_SIZE < ARRAY_SIZE) {
+ throw new TopkDivideMemoryNotEnoughException(
+ String.format(
+ "WARNING: MEMTABLE_TOPK_SIZE %d is bigger than "
+ + "the TVList's row count %d, "
+ + "or is smaller than ARRAY_SIZE %d",
+ MEMTABLE_TOPK_SIZE, rowCount, ARRAY_SIZE));
+ }
+ IntTVList topkTVList = IntTVList.newList();
+ int truncatedIndex = rowCount - MEMTABLE_TOPK_SIZE;
+ int truncatedArrayIndex =
+ truncatedIndex / ARRAY_SIZE; // no matter truncatedIndex in or not in
the block
+ truncatedIndex = truncatedArrayIndex * ARRAY_SIZE;
+ for (int i = truncatedArrayIndex; i < timestamps.size(); i++) {
+ topkTVList.timestamps.add(timestamps.get(i));
+ topkTVList.values.add(values.get(i));
+ }
+ for (int i = timestamps.size() - 1; i >= truncatedArrayIndex; i--) {
+ timestamps.remove(timestamps.size() - 1);
+ values.remove(values.size() - 1);
+ }
+ topkTVList.rowCount = rowCount - truncatedIndex;
+ topkTVList.sorted = true;
+ topkTVList.topKTime = topkTVList.getTime(0);
+ rowCount = truncatedIndex;
+ topKTime = topkTVList.getTopKTime();
+ return topkTVList;
+ }
+
@Override
public TimeValuePair getTimeValuePair(int index) {
return new TimeValuePair(
@@ -159,47 +234,8 @@ public abstract class IntTVList extends TVList {
@Override
public void putInts(long[] time, int[] value, BitMap bitMap, int start, int
end) {
- checkExpansion();
-
- int idx = start;
- // constraint: time.length + timeIdxOffset == value.length
- int timeIdxOffset = 0;
- if (bitMap != null && !bitMap.isAllUnmarked()) {
- // time array is a reference, should clone necessary time values
- long[] clonedTime = new long[end - start];
- System.arraycopy(time, start, clonedTime, 0, end - start);
- time = clonedTime;
- timeIdxOffset = start;
- // drop null at the end of value array
- int nullCnt =
- dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start,
end, timeIdxOffset);
- end -= nullCnt;
- } else {
- updateMaxTimeAndSorted(time, start, end);
- }
-
- while (idx < end) {
- int inputRemaining = end - idx;
- int arrayIdx = rowCount / ARRAY_SIZE;
- int elementIdx = rowCount % ARRAY_SIZE;
- int internalRemaining = ARRAY_SIZE - elementIdx;
- if (internalRemaining >= inputRemaining) {
- // the remaining inputs can fit the last array, copy all remaining
inputs into last array
- System.arraycopy(
- time, idx - timeIdxOffset, timestamps.get(arrayIdx), elementIdx,
inputRemaining);
- System.arraycopy(value, idx, values.get(arrayIdx), elementIdx,
inputRemaining);
- rowCount += inputRemaining;
- break;
- } else {
- // the remaining inputs cannot fit the last array, fill the last array
and create a new
- // one and enter the next loop
- System.arraycopy(
- time, idx - timeIdxOffset, timestamps.get(arrayIdx), elementIdx,
internalRemaining);
- System.arraycopy(value, idx, values.get(arrayIdx), elementIdx,
internalRemaining);
- idx += internalRemaining;
- rowCount += internalRemaining;
- checkExpansion();
- }
+ for (int i = start; i < end; i++) {
+ putInt(time[i], value[i]);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java
index 5e6a9dec2dc..d60067af576 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java
@@ -30,11 +30,16 @@ public class QuickAlignedTVList extends AlignedTVList
implements QuickSort {
@Override
public void sort() {
if (!sorted) {
- qsort(0, rowCount - 1);
+ qSort(0, rowCount - 1);
}
sorted = true;
}
+ @Override
+ public void sort(int lo, int hi) {
+ qSort(lo, hi);
+ }
+
@Override
protected void set(int src, int dest) {
long srcT = getTime(src);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java
index dcdcbc2cb59..ee4e6f9cbc8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java
@@ -42,11 +42,16 @@ public class QuickBinaryTVList extends BinaryTVList
implements QuickSort {
@Override
public void sort() {
if (!sorted) {
- qsort(0, rowCount - 1);
+ qSort(0, rowCount - 1);
}
sorted = true;
}
+ @Override
+ public void sort(int lo, int hi) {
+ qSort(lo, hi);
+ }
+
@Override
protected void set(int src, int dest) {
long srcT = getTime(src);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java
index e4fbfaefca2..c7b43470551 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java
@@ -39,11 +39,16 @@ public class QuickBooleanTVList extends BooleanTVList
implements QuickSort {
@Override
public void sort() {
if (!sorted) {
- qsort(0, rowCount - 1);
+ qSort(0, rowCount - 1);
}
sorted = true;
}
+ @Override
+ public void sort(int lo, int hi) {
+ qSort(lo, hi);
+ }
+
@Override
protected void set(int src, int dest) {
long srcT = getTime(src);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java
index 852844f9231..584c8f8f789 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java
@@ -39,11 +39,16 @@ public class QuickDoubleTVList extends DoubleTVList
implements QuickSort {
@Override
public void sort() {
if (!sorted) {
- qsort(0, rowCount - 1);
+ qSort(0, rowCount - 1);
}
sorted = true;
}
+ @Override
+ public void sort(int lo, int hi) {
+ qSort(lo, hi);
+ }
+
@Override
protected void set(int src, int dest) {
long srcT = getTime(src);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java
index 409a00a093e..90485257276 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java
@@ -39,11 +39,16 @@ public class QuickFloatTVList extends FloatTVList
implements QuickSort {
@Override
public void sort() {
if (!sorted) {
- qsort(0, rowCount - 1);
+ qSort(0, rowCount - 1);
}
sorted = true;
}
+ @Override
+ public void sort(int lo, int hi) {
+ qSort(lo, hi);
+ }
+
@Override
protected void set(int src, int dest) {
long srcT = getTime(src);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java
index bc44cf79053..d24842babac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java
@@ -22,11 +22,16 @@ public class QuickIntTVList extends IntTVList implements
QuickSort {
@Override
public void sort() {
if (!sorted) {
- qsort(0, rowCount - 1);
+ qSort(0, rowCount - 1);
}
sorted = true;
}
+ @Override
+ public void sort(int lo, int hi) {
+ qSort(lo, hi);
+ }
+
@Override
public void swap(int p, int q) {
int valueP = getInt(p);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java
index 16d8fab3606..3d4f1ab1a7c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java
@@ -39,11 +39,16 @@ public class QuickLongTVList extends LongTVList implements
QuickSort {
@Override
public void sort() {
if (!sorted) {
- qsort(0, rowCount - 1);
+ qSort(0, rowCount - 1);
}
sorted = true;
}
+ @Override
+ public void sort(int lo, int hi) {
+ qSort(lo, hi);
+ }
+
@Override
protected void set(int src, int dest) {
long srcT = getTime(src);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java
index 8229781aa05..9bad94d4f9b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java
@@ -58,7 +58,7 @@ public interface QuickSort {
// }
// }
- default void qsort(int lo, int hi) {
+ default void qSort(int lo, int hi) {
if (lo < hi) {
// TODO: use insertion sort in smaller array
// if(hi - lo <= 32) {
@@ -66,8 +66,8 @@ public interface QuickSort {
// }
// partition
int pivotIndex = partition(lo, hi);
- qsort(lo, pivotIndex - 1);
- qsort(pivotIndex + 1, hi);
+ qSort(lo, pivotIndex - 1);
+ qSort(pivotIndex + 1, hi);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index f2b26158a5f..8430df0b317 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -42,6 +42,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
+import static
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.MEMTABLE_TOPK_SIZE;
import static
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
import static
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
@@ -56,6 +57,8 @@ public abstract class TVList implements WALEntryValue {
protected boolean sorted = true;
protected long maxTime;
+ protected long topKTime;
+ protected int sortCount;
// record reference count of this tv list
// currently this reference will only be increase because we can't know when
to decrease it
protected AtomicInteger referenceCount;
@@ -64,7 +67,9 @@ public abstract class TVList implements WALEntryValue {
protected TVList() {
timestamps = new ArrayList<>();
rowCount = 0;
+ sortCount = 0;
maxTime = Long.MIN_VALUE;
+ topKTime = Long.MIN_VALUE;
referenceCount = new AtomicInteger();
}
@@ -107,6 +112,8 @@ public abstract class TVList implements WALEntryValue {
public abstract void sort();
+ public abstract void sort(int lo, int hi);
+
public void increaseReferenceCount() {
referenceCount.incrementAndGet();
}
@@ -212,6 +219,10 @@ public abstract class TVList implements WALEntryValue {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
+ public TVList divide() throws TopkDivideMemoryNotEnoughException {
+ throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
+ }
+
public Object getAlignedValue(int index) {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
@@ -229,6 +240,10 @@ public abstract class TVList implements WALEntryValue {
return maxTime;
}
+ public long getTopKTime() {
+ return topKTime;
+ }
+
public long getVersion() {
return version;
}
@@ -328,6 +343,7 @@ public abstract class TVList implements WALEntryValue {
for (int i = start; i < end; i++) {
inPutMinTime = Math.min(inPutMinTime, time[i]);
maxTime = Math.max(maxTime, time[i]);
+ topKTime = Math.max(topKTime, time[Math.max(0, i - MEMTABLE_TOPK_SIZE)]);
if (inputSorted && i < length - 1 && time[i] > time[i + 1]) {
inputSorted = false;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java
index 7c6fa7f92ef..819b00e3c93 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java
@@ -50,7 +50,7 @@ public class TimAlignedTVList extends AlignedTVList
implements TimSort {
(int[][])
PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, rowCount);
}
if (!sorted) {
- sort(0, rowCount);
+ this.timSort(0, rowCount);
}
clearSortedValue();
clearSortedTime();
@@ -58,7 +58,12 @@ public class TimAlignedTVList extends AlignedTVList
implements TimSort {
}
@Override
- public void tim_set(int src, int dest) {
+ public void sort(int lo, int hi) {
+ timSort(lo, hi);
+ }
+
+ @Override
+ public void timSet(int src, int dest) {
set(src, dest);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java
index 03760fd14c8..fccb64a1b2b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java
@@ -44,14 +44,19 @@ public class TimBinaryTVList extends BinaryTVList
implements TimSort {
sortedValues =
(Binary[][])
PrimitiveArrayManager.createDataListsByType(TSDataType.TEXT, rowCount);
}
- sort(0, rowCount);
+ timSort(0, rowCount);
clearSortedValue();
clearSortedTime();
sorted = true;
}
@Override
- public void tim_set(int src, int dest) {
+ public void sort(int lo, int hi) {
+ timSort(lo, hi);
+ }
+
+ @Override
+ public void timSet(int src, int dest) {
set(src, dest);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java
index 22b56475905..04d43f30bb9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java
@@ -43,7 +43,7 @@ public class TimBooleanTVList extends BooleanTVList
implements TimSort {
(boolean[][])
PrimitiveArrayManager.createDataListsByType(TSDataType.BOOLEAN, rowCount);
}
if (!sorted) {
- sort(0, rowCount);
+ timSort(0, rowCount);
}
clearSortedValue();
clearSortedTime();
@@ -51,7 +51,12 @@ public class TimBooleanTVList extends BooleanTVList
implements TimSort {
}
@Override
- public void tim_set(int src, int dest) {
+ public void sort(int lo, int hi) {
+ timSort(lo, hi);
+ }
+
+ @Override
+ public void timSet(int src, int dest) {
set(src, dest);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java
index 3c0aa823798..543a4e2ec29 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java
@@ -43,7 +43,7 @@ public class TimDoubleTVList extends DoubleTVList implements
TimSort {
(double[][])
PrimitiveArrayManager.createDataListsByType(TSDataType.DOUBLE, rowCount);
}
if (!sorted) {
- sort(0, rowCount);
+ timSort(0, rowCount);
}
clearSortedValue();
clearSortedTime();
@@ -51,7 +51,12 @@ public class TimDoubleTVList extends DoubleTVList implements
TimSort {
}
@Override
- public void tim_set(int src, int dest) {
+ public void sort(int lo, int hi) {
+ timSort(lo, hi);
+ }
+
+ @Override
+ public void timSet(int src, int dest) {
set(src, dest);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java
index 2b1fb440a95..d360959a81e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java
@@ -44,7 +44,7 @@ public class TimFloatTVList extends FloatTVList implements
TimSort {
(float[][])
PrimitiveArrayManager.createDataListsByType(TSDataType.FLOAT, rowCount);
}
if (!sorted) {
- sort(0, rowCount);
+ timSort(0, rowCount);
}
clearSortedValue();
clearSortedTime();
@@ -52,7 +52,12 @@ public class TimFloatTVList extends FloatTVList implements
TimSort {
}
@Override
- public void tim_set(int src, int dest) {
+ public void sort(int lo, int hi) {
+ timSort(lo, hi);
+ }
+
+ @Override
+ public void timSet(int src, int dest) {
set(src, dest);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java
index b3ae939892d..d2f153e05a1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java
@@ -33,18 +33,22 @@ public class TimIntTVList extends IntTVList implements
TimSort {
@Override
public void sort() {
+ sort(0, rowCount);
+ }
+
+ @Override
+ public void sort(int lo, int hi) {
+ int len = hi - lo;
if (sortedTimestamps == null
- || sortedTimestamps.length <
PrimitiveArrayManager.getArrayRowCount(rowCount)) {
+ || sortedTimestamps.length <
PrimitiveArrayManager.getArrayRowCount(len)) {
sortedTimestamps =
- (long[][])
PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, rowCount);
+ (long[][])
PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, len);
}
- if (sortedValues == null
- || sortedValues.length <
PrimitiveArrayManager.getArrayRowCount(rowCount)) {
- sortedValues =
- (int[][])
PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, rowCount);
+ if (sortedValues == null || sortedValues.length <
PrimitiveArrayManager.getArrayRowCount(len)) {
+ sortedValues = (int[][])
PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, len);
}
if (!sorted) {
- sort(0, rowCount);
+ timSort(0, len);
}
clearSortedValue();
clearSortedTime();
@@ -52,7 +56,7 @@ public class TimIntTVList extends IntTVList implements
TimSort {
}
@Override
- public void tim_set(int src, int dest) {
+ public void timSet(int src, int dest) {
set(src, dest);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java
index e3a02e41db5..9096cd47e1b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java
@@ -43,7 +43,7 @@ public class TimLongTVList extends LongTVList implements
TimSort {
(long[][])
PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, rowCount);
}
if (!sorted) {
- sort(0, rowCount);
+ timSort(0, rowCount);
}
clearSortedValue();
clearSortedTime();
@@ -51,7 +51,12 @@ public class TimLongTVList extends LongTVList implements
TimSort {
}
@Override
- public void tim_set(int src, int dest) {
+ public void sort(int lo, int hi) {
+ timSort(lo, hi);
+ }
+
+ @Override
+ public void timSet(int src, int dest) {
set(src, dest);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java
index 63584429d4a..0426fdafe66 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java
@@ -27,7 +27,7 @@ public interface TimSort {
int SMALL_ARRAY_LENGTH = 32;
/** the same as the 'set' function in TVList, the reason is to avoid two
equal functions. */
- void tim_set(int src, int dest);
+ void timSet(int src, int dest);
void setFromSorted(int src, int dest);
@@ -55,7 +55,7 @@ public interface TimSort {
* the entrance of tim_sort; 1. array_size <= 32, use binary sort. 2.
recursively invoke merge
* sort.
*/
- default void sort(int lo, int hi) {
+ default void timSort(int lo, int hi) {
if (lo == hi) {
return;
}
@@ -65,8 +65,8 @@ public interface TimSort {
return;
}
int mid = (lo + hi) >>> 1;
- sort(lo, mid);
- sort(mid, hi);
+ timSort(lo, mid);
+ timSort(mid, hi);
merge(lo, mid, hi);
}
@@ -127,7 +127,7 @@ public interface TimSort {
*/
int n = start - left; // The number of elements to move
for (int i = n; i >= 1; i--) {
- tim_set(left + i - 1, left + i);
+ timSet(left + i - 1, left + i);
}
setPivotTo(left);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TopkDivideMemoryNotEnoughException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TopkDivideMemoryNotEnoughException.java
new file mode 100644
index 00000000000..e1d8d727ec6
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TopkDivideMemoryNotEnoughException.java
@@ -0,0 +1,8 @@
+package org.apache.iotdb.db.utils.datastructure;
+
+public class TopkDivideMemoryNotEnoughException extends Exception {
+
+ public TopkDivideMemoryNotEnoughException(String message) {
+ super(message);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index eb959726c5c..c9d3a67f323 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -464,7 +464,7 @@ data_replication_factor=1
# Add a switch to enable separate sequence and unsequence data.
# If it is true, then data will be separated into seq and unseq data dir. If
it is false, then all data will be written into unseq data dir.
# Datatype: boolean
-# enable_separate_data=true
+enable_separate_data=false
# What will the system do when unrecoverable error occurs.
# Datatype: String
@@ -509,9 +509,14 @@ data_replication_factor=1
# BACKWARD: backward sort
# tvlist_sort_algorithm=TIM
+# The number of points stay in the next tvlist
+# The default waiting size is 0
+# Datatype: int
+seq_memtable_topk_size=0
+
# When the average point number of timeseries in memtable exceeds this, the
memtable is flushed to disk. The default threshold is 100000.
# Datatype: int
-# avg_series_point_number_threshold=100000
+avg_series_point_number_threshold=100000
# How many threads can concurrently flush. When <= 0, use CPU core number.
# Datatype: int
@@ -544,15 +549,15 @@ data_replication_factor=1
####################
# sequence space compaction: only compact the sequence files
# Datatype: boolean
-# enable_seq_space_compaction=true
+enable_seq_space_compaction=false
# unsequence space compaction: only compact the unsequence files
# Datatype: boolean
-# enable_unseq_space_compaction=true
+enable_unseq_space_compaction=false
# cross space compaction: compact the unsequence files into the overlapped
sequence files
# Datatype: boolean
-# enable_cross_space_compaction=true
+enable_cross_space_compaction=false
# the selector of cross space compaction task
# Options: rewrite
@@ -644,7 +649,7 @@ data_replication_factor=1
# The interval of compaction task schedule
# Datatype: long, Unit: ms
-# compaction_schedule_interval_in_ms=60000
+compaction_schedule_interval_in_ms=600000000
# The interval of compaction task submission
# Datatype: long, Unit: ms