This is an automated email from the ASF dual-hosted git repository.

spricoder pushed a commit to branch research/deferredflushing
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/research/deferredflushing by 
this push:
     new c10e124903a [Deferred Flushing] Implementation of deferred flushing 
strategy. (#14724)
c10e124903a is described below

commit c10e124903a2ff38ff13770d8b9d93ec7305fade
Author: DeferredFlushing <[email protected]>
AuthorDate: Sat Jan 18 19:20:32 2025 +0800

    [Deferred Flushing] Implementation of deferred flushing strategy. (#14724)
    
    Co-authored-by: DeferredFlushing <>
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  11 ++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +
 .../db/storageengine/dataregion/DataRegion.java    |   8 +-
 .../dataregion/flush/MemTableFlushTask.java        |  25 ++++
 .../dataregion/memtable/AbstractMemTable.java      |  71 ++++++++++-
 .../memtable/AlignedWritableMemChunk.java          |  14 +++
 .../memtable/AlignedWritableMemChunkGroup.java     |  13 ++
 .../dataregion/memtable/IMemTable.java             |   9 ++
 .../dataregion/memtable/IWritableMemChunk.java     |   9 ++
 .../memtable/IWritableMemChunkGroup.java           |   5 +
 .../dataregion/memtable/PrimitiveMemTable.java     |   8 ++
 .../dataregion/memtable/TsFileProcessor.java       |  77 ++++++++++--
 .../dataregion/memtable/WritableMemChunk.java      |  24 +++-
 .../dataregion/memtable/WritableMemChunkGroup.java |  20 +++
 .../rescon/memory/PrimitiveArrayManager.java       |   1 +
 .../db/utils/datastructure/AlignedTVList.java      |  36 +++++-
 .../iotdb/db/utils/datastructure/BackwardSort.java |   4 +-
 .../iotdb/db/utils/datastructure/IntTVList.java    | 138 +++++++++++++--------
 .../db/utils/datastructure/QuickAlignedTVList.java |   7 +-
 .../db/utils/datastructure/QuickBinaryTVList.java  |   7 +-
 .../db/utils/datastructure/QuickBooleanTVList.java |   7 +-
 .../db/utils/datastructure/QuickDoubleTVList.java  |   7 +-
 .../db/utils/datastructure/QuickFloatTVList.java   |   7 +-
 .../db/utils/datastructure/QuickIntTVList.java     |   7 +-
 .../db/utils/datastructure/QuickLongTVList.java    |   7 +-
 .../iotdb/db/utils/datastructure/QuickSort.java    |   6 +-
 .../iotdb/db/utils/datastructure/TVList.java       |  16 +++
 .../db/utils/datastructure/TimAlignedTVList.java   |   9 +-
 .../db/utils/datastructure/TimBinaryTVList.java    |   9 +-
 .../db/utils/datastructure/TimBooleanTVList.java   |   9 +-
 .../db/utils/datastructure/TimDoubleTVList.java    |   9 +-
 .../db/utils/datastructure/TimFloatTVList.java     |   9 +-
 .../iotdb/db/utils/datastructure/TimIntTVList.java |  20 +--
 .../db/utils/datastructure/TimLongTVList.java      |   9 +-
 .../iotdb/db/utils/datastructure/TimSort.java      |  10 +-
 .../TopkDivideMemoryNotEnoughException.java        |   8 ++
 .../resources/conf/iotdb-common.properties         |  17 ++-
 37 files changed, 547 insertions(+), 111 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a9ec1ee5f5b..f3d0c7b611d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -410,6 +410,9 @@ public class IoTDBConfig {
   /** The sort algorithm used in TVList */
   private TVListSortAlgorithm tvListSortAlgorithm = TVListSortAlgorithm.TIM;
 
+  /** The value of primitive topk area size in the tvlist */
+  private int seqMemtableTopKSize = 0;
+
   /** When average series point number reaches this, flush the memtable to 
disk */
   private int avgSeriesPointNumberThreshold = 100000;
 
@@ -2070,6 +2073,14 @@ public class IoTDBConfig {
     this.tvListSortAlgorithm = tvListSortAlgorithm;
   }
 
+  public int getSeqMemtableTopKSize() {
+    return seqMemtableTopKSize;
+  }
+
+  public void setSeqMemtableTopKSize(int seqMemtableTopKSize) {
+    this.seqMemtableTopKSize = seqMemtableTopKSize;
+  }
+
   public int getAvgSeriesPointNumberThreshold() {
     return avgSeriesPointNumberThreshold;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 9160cb8b0b1..2b728ca5865 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -387,6 +387,11 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "tvlist_sort_algorithm", 
conf.getTvListSortAlgorithm().toString())));
 
+    conf.setSeqMemtableTopKSize(
+        (Integer.parseInt(
+            properties.getProperty(
+                "seq_memtable_topk_size", 
Integer.toString(conf.getSeqMemtableTopKSize())))));
+
     conf.setAvgSeriesPointNumberThreshold(
         Integer.parseInt(
             properties.getProperty(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 20cbfef2aee..eff6ddb30b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1391,7 +1391,6 @@ public class DataRegion implements IDataRegionForQuery {
             version,
             0,
             0);
-
     return getTsFileProcessor(sequence, filePath, timePartitionId);
   }
 
@@ -1503,6 +1502,7 @@ public class DataRegion implements IDataRegionForQuery {
       if 
(!workSequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) {
         
timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
       }
+      logger.info("close an unsequence tsfile processor {}", databaseName + 
"-" + dataRegionId);
     }
     if (workSequenceTsFileProcessors.get(tsFileProcessor.getTimeRangeId()) == 
null
         && 
workUnsequenceTsFileProcessors.get(tsFileProcessor.getTimeRangeId()) == null) {
@@ -1663,7 +1663,8 @@ public class DataRegion implements IDataRegionForQuery {
       for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
         if (tsFileProcessor.getWorkMemTableUpdateTime() < timeLowerBound) {
           logger.info(
-              "Exceed sequence memtable flush interval, so flush working 
memtable of time partition {} in database {}[{}]",
+              "Exceed sequence memtable {} flush interval, so flush working 
memtable of time partition {} in database {}[{}]",
+              tsFileProcessor.getWorkMemTable().getMemTableId(),
               tsFileProcessor.getTimeRangeId(),
               databaseName,
               dataRegionId);
@@ -1689,7 +1690,8 @@ public class DataRegion implements IDataRegionForQuery {
       for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
         if (tsFileProcessor.getWorkMemTableUpdateTime() < timeLowerBound) {
           logger.info(
-              "Exceed unsequence memtable flush interval, so flush working 
memtable of time partition {} in database {}[{}]",
+              "Exceed unsequence memtable {} flush interval, so flush working 
memtable of time partition {} in database {}[{}]",
+              tsFileProcessor.getWorkMemTable().getMemTableId(),
               tsFileProcessor.getTimeRangeId(),
               databaseName,
               dataRegionId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index ab5440d5358..9becb3c560c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -110,6 +110,13 @@ public class MemTableFlushTask {
         memTable.getSeriesNumber() == 0
             ? 0
             : memTable.getTotalPointsNum() / memTable.getSeriesNumber();
+    LOGGER.info(
+        "The memTable {} size of SG {} is {}, the avg series points num in 
chunk is {}, total timeseries number is {}",
+        memTable.getMemTableId(),
+        storageGroup,
+        memTable.memSize(),
+        avgSeriesPointsNum,
+        memTable.getSeriesNumber());
     WRITING_METRICS.recordFlushingMemTableStatus(
         storageGroup,
         memTable.memSize(),
@@ -205,6 +212,13 @@ public class MemTableFlushTask {
             MetricLevel.CORE,
             Tag.NAME.toString(),
             "flush");
+
+    LOGGER.info(
+        "Database {} flushing memtable {} with {} has finished! Time 
consumption: {}ms",
+        storageGroup,
+        memTable.getMemTableId(),
+        memTable,
+        System.currentTimeMillis() - start);
   }
 
   /** encoding task (second task of pipeline) */
@@ -274,6 +288,17 @@ public class MemTableFlushTask {
                   databaseName ->
                       recordFlushPointsMetricInternal(
                           memTable.getTotalPointsNum(), databaseName, 
dataRegionId));
+
+          LOGGER.info(
+              "Database {}, flushing memtable {} with size {} and points {} in 
file "
+                  + "{} into disk: Encoding data cost "
+                  + "{} ms.",
+              storageGroup,
+              memTable.getMemTableId(),
+              memTable.memSize(),
+              memTable.getTotalPointsNum(),
+              writer.getFile().getName(),
+              memSerializeTime);
           WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_ENCODING, 
memSerializeTime);
         }
       };
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index e54e7ef2c36..0bdebb21481 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 import org.apache.iotdb.db.utils.MemUtils;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -88,9 +89,9 @@ public abstract class AbstractMemTable implements IMemTable {
 
   private long minPlanIndex = Long.MAX_VALUE;
 
-  private final long memTableId = memTableIdCounter.incrementAndGet();
+  private long memTableId = memTableIdCounter.incrementAndGet();
 
-  private final long createdTime = System.currentTimeMillis();
+  private long createdTime = System.currentTimeMillis();
 
   /** this time is updated by the timed flush, same as createdTime when the 
feature is disabled. */
   private long updateTime = createdTime;
@@ -126,6 +127,34 @@ public abstract class AbstractMemTable implements 
IMemTable {
     this.memTableMap = memTableMap;
   }
 
+  /** create memtable from last memtable, preserve topk points */
+  protected AbstractMemTable(
+      String database,
+      String dataRegionId,
+      Map<IDeviceID, IWritableMemChunkGroup> memTableMap,
+      AbstractMemTable lastMemTable) {
+    this.database = database;
+    this.dataRegionId = dataRegionId;
+    this.memTableMap = memTableMap;
+    for (IDeviceID key : memTableMap.keySet()) {
+      Map<String, IWritableMemChunk> memChunkMap = 
memTableMap.get(key).getMemChunkMap();
+      for (String schema : memChunkMap.keySet()) {
+        IWritableMemChunk memChunk = memChunkMap.get(schema);
+        totalPointsNum += memChunk.getTVList().rowCount();
+        memSize +=
+            (long) memChunk.getTVList().rowCount()
+                * memChunk.getSchema().getType().getDataTypeSize();
+      }
+    }
+    totalPointsNumThreshold = lastMemTable.getTotalPointsNumThreshold();
+    createdTime = System.currentTimeMillis();
+    memTableId = memTableIdCounter.incrementAndGet();
+    seriesNumber = lastMemTable.getSeriesNumber();
+    tvListRamCost = lastMemTable.getTVListsRamCost() * totalPointsNum / 
lastMemTable.totalPointsNum;
+    shouldFlush = false;
+    flushStatus = FlushStatus.WORKING;
+  }
+
   @Override
   public Map<IDeviceID, IWritableMemChunkGroup> getMemTableMap() {
     return memTableMap;
@@ -439,6 +468,16 @@ public abstract class AbstractMemTable implements 
IMemTable {
     return memSize;
   }
 
+  @Override
+  public void setMemSize(long m) {
+    memSize = memSize + m;
+  }
+
+  @Override
+  public void setTotalPointsNum(long m) {
+    totalPointsNum = totalPointsNum + m;
+  }
+
   @Override
   public boolean reachTotalPointNumThreshold() {
     if (totalPointsNum == 0) {
@@ -529,6 +568,21 @@ public abstract class AbstractMemTable implements 
IMemTable {
     }
   }
 
+  @Override
+  public IMemTable divide() throws TopkDivideMemoryNotEnoughException {
+    Map<IDeviceID, IWritableMemChunkGroup> topkMemTableMap = new HashMap<>();
+    for (IDeviceID key : memTableMap.keySet()) {
+      IWritableMemChunkGroup chunkGroupTemp = memTableMap.get(key).divide();
+      topkMemTableMap.put(key, chunkGroupTemp);
+    }
+    PrimitiveMemTable topkMemtable =
+        new PrimitiveMemTable(database, dataRegionId, topkMemTableMap, this);
+    memSize -= topkMemtable.memSize();
+    totalPointsNum -= topkMemtable.getTotalPointsNum();
+    tvListRamCost -= topkMemtable.getTVListsRamCost();
+    return topkMemtable;
+  }
+
   @Override
   public void addTVListRamCost(long cost) {
     this.tvListRamCost += cost;
@@ -689,6 +743,15 @@ public abstract class AbstractMemTable implements 
IMemTable {
     return latestTimeForEachDevice;
   }
 
+  @Override
+  public Map<String, Long> getTopKTime() {
+    Map<String, Long> latestTimeForEachDevice = new HashMap<>();
+    for (Entry<IDeviceID, IWritableMemChunkGroup> entry : 
memTableMap.entrySet()) {
+      latestTimeForEachDevice.put(entry.getKey().toStringID(), 
entry.getValue().getTopKTime());
+    }
+    return latestTimeForEachDevice;
+  }
+
   public static class Factory {
     private Factory() {
       // Empty constructor
@@ -734,4 +797,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
   public boolean isTotallyGeneratedByPipe() {
     return this.isTotallyGeneratedByPipe.get();
   }
+
+  public long getTotalPointsNumThreshold() {
+    return totalPointsNumThreshold;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 64a56ee3ac5..55ad4849a1b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -253,11 +254,19 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
     return null;
   }
 
+  @Override
+  public void setSchema(IMeasurementSchema s) {}
+
   @Override
   public long getMaxTime() {
     return list.getMaxTime();
   }
 
+  @Override
+  public long getTopKTime() {
+    return list.getTopKTime();
+  }
+
   @Override
   public synchronized TVList getSortedTvListForQuery() {
     sortTVList();
@@ -495,6 +504,11 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
         .getTimestamp();
   }
 
+  @Override
+  public IWritableMemChunk divide() throws TopkDivideMemoryNotEnoughException {
+    return new AlignedWritableMemChunk(schemaList, (AlignedTVList) 
list.divide());
+  }
+
   @Override
   public boolean isEmpty() {
     return list.rowCount() == 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
index 17b12a748da..875c1130692 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternUtil;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -143,6 +144,18 @@ public class AlignedWritableMemChunkGroup implements 
IWritableMemChunkGroup {
     return memChunk.getMaxTime();
   }
 
+  @Override
+  public long getTopKTime() {
+    return memChunk.getTopKTime();
+  }
+
+  @Override
+  public IWritableMemChunkGroup divide() throws 
TopkDivideMemoryNotEnoughException {
+    AlignedWritableMemChunkGroup topkMemChunkGroup = new 
AlignedWritableMemChunkGroup();
+    topkMemChunkGroup.memChunk = (AlignedWritableMemChunk) memChunk.divide();
+    return topkMemChunkGroup;
+  }
+
   public AlignedWritableMemChunk getAlignedMemChunk() {
     return memChunk;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index 8e1a883d2a4..fc9a5424df4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTablet
 import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
@@ -65,6 +66,10 @@ public interface IMemTable extends WALEntryValue {
   /** @return memory usage */
   long memSize();
 
+  void setMemSize(long m);
+
+  void setTotalPointsNum(long m);
+
   /** only used when mem control enabled */
   void addTVListRamCost(long cost);
 
@@ -132,6 +137,8 @@ public interface IMemTable extends WALEntryValue {
    */
   void delete(PartialPath path, PartialPath devicePath, long startTimestamp, 
long endTimestamp);
 
+  /** divide the memtable into the topk and regular ones @ return the topk 
memtable. */
+  IMemTable divide() throws TopkDivideMemoryNotEnoughException;
   /**
    * Make a copy of this MemTable.
    *
@@ -185,4 +192,6 @@ public interface IMemTable extends WALEntryValue {
   void markAsNotGeneratedByPipe();
 
   boolean isTotallyGeneratedByPipe();
+
+  Map<String, Long> getTopKTime();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 4c07c3ece33..4b643cb35d1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
@@ -83,6 +84,8 @@ public interface IWritableMemChunk extends WALEntryValue {
 
   IMeasurementSchema getSchema();
 
+  void setSchema(IMeasurementSchema s);
+
   /**
    * served for read requests.
    *
@@ -128,6 +131,10 @@ public interface IWritableMemChunk extends WALEntryValue {
     return Long.MAX_VALUE;
   }
 
+  default long getTopKTime() {
+    return Long.MAX_VALUE;
+  }
+
   /** @return how many points are deleted */
   int delete(long lowerBound, long upperBound);
 
@@ -141,5 +148,7 @@ public interface IWritableMemChunk extends WALEntryValue {
 
   long getLastPoint();
 
+  IWritableMemChunk divide() throws TopkDivideMemoryNotEnoughException;
+
   boolean isEmpty();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
index ecaf8b2b738..eb3c1f0854d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
@@ -53,5 +54,9 @@ public interface IWritableMemChunkGroup extends WALEntryValue 
{
 
   long getCurrentTVListSize(String measurement);
 
+  IWritableMemChunkGroup divide() throws TopkDivideMemoryNotEnoughException;
+
   long getMaxTime();
+
+  long getTopKTime();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
index 70b0f909381..cfdd9c256c1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
@@ -37,6 +37,14 @@ public class PrimitiveMemTable extends AbstractMemTable {
     super(database, dataRegionId, memTableMap);
   }
 
+  public PrimitiveMemTable(
+      String database,
+      String dataRegionId,
+      Map<IDeviceID, IWritableMemChunkGroup> memTableMap,
+      AbstractMemTable lastMemTable) {
+    super(database, dataRegionId, memTableMap, lastMemTable);
+  }
+
   @Override
   public IMemTable copy() {
     Map<IDeviceID, IWritableMemChunkGroup> newMap = new 
HashMap<>(getMemTableMap());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 953606ed136..fd8331b7cbe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -62,6 +62,7 @@ import 
org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -93,6 +94,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.GET_QUERY_RESOURCE_FROM_MEM;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.FLUSHING_MEMTABLE;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.WORKING_MEMTABLE;
+import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.MEMTABLE_TOPK_SIZE;
 
 @SuppressWarnings("java:S1135") // ignore todos
 public class TsFileProcessor {
@@ -180,6 +182,8 @@ public class TsFileProcessor {
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS 
=
       PerformanceOverviewMetrics.getInstance();
 
+  private File tsfileTemp;
+
   @SuppressWarnings("squid:S107")
   public TsFileProcessor(
       String storageGroupName,
@@ -324,6 +328,39 @@ public class TsFileProcessor {
                 dataRegionInfo.getDataRegion().getDatabaseName(),
                 dataRegionInfo.getDataRegion().getDataRegionId());
     walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath());
+    logger.info(
+        "Succeed to create {} memTable {}",
+        sequence ? "Sequence" : "Unsequence",
+        workMemTable.getMemTableId());
+  }
+
+  private void createNewWorkingMemTable(IMemTable tobeFlushed) throws 
WriteProcessException {
+    if (MEMTABLE_TOPK_SIZE != 0) {
+      try {
+        logger.info(
+            "Try to divide memTable {} with size {} and points {}",
+            tobeFlushed.getMemTableId(),
+            tobeFlushed.memSize(),
+            tobeFlushed.getTotalPointsNum());
+        workMemTable = tobeFlushed.divide();
+        walNode.onMemTableCreated(workMemTable, 
tsFileResource.getTsFilePath());
+        logger.info(
+            "Succeed to divide {} memTable {} with size {} and points {} to "
+                + "new workMemTable {} with size {} and points {} to",
+            sequence ? "Sequence" : "Unsequence",
+            tobeFlushed.getMemTableId(),
+            tobeFlushed.memSize(),
+            tobeFlushed.getTotalPointsNum(),
+            workMemTable.getMemTableId(),
+            workMemTable.memSize(),
+            workMemTable.getTotalPointsNum());
+      } catch (TopkDivideMemoryNotEnoughException e) {
+        logger.warn(e.getMessage());
+        createNewWorkingMemTable();
+      }
+    } else {
+      createNewWorkingMemTable();
+    }
   }
 
   /**
@@ -854,18 +891,19 @@ public class TsFileProcessor {
     try {
       if (logger.isDebugEnabled()) {
         if (workMemTable != null) {
-          logger.debug(
-              "{}: flush a working memtable in async close tsfile {}, memtable 
size: {}, tsfile "
-                  + "size: {}, plan index: [{}, {}], progress index: {}",
+          logger.info(
+              "{}: flush working memtable {} with size: {}, tsfile "
+                  + "size: {}, plan index: [{}, {}], progress index: {} in 
async close tsfile {}",
               storageGroupName,
-              tsFileResource.getTsFile().getAbsolutePath(),
+              workMemTable.getMemTableId(),
               workMemTable.memSize(),
               tsFileResource.getTsFileSize(),
               workMemTable.getMinPlanIndex(),
               workMemTable.getMaxPlanIndex(),
-              tsFileResource.getMaxProgressIndex());
+              tsFileResource.getMaxProgressIndex(),
+              tsFileResource.getTsFile().getAbsolutePath());
         } else {
-          logger.debug(
+          logger.info(
               "{}: flush a NotifyFlushMemTable in async close tsfile {}, 
tsfile size: {}",
               storageGroupName,
               tsFileResource.getTsFile().getAbsolutePath(),
@@ -899,6 +937,12 @@ public class TsFileProcessor {
         // When invoke closing TsFile after insert data to memTable, we 
shouldn't flush until invoke
         // flushing memTable in System module.
         Future<?> future = addAMemtableIntoFlushingList(tmpMemTable);
+        logger.info(
+            "Memtable {}: {} with size {} and points {} has been added to 
flushing list",
+            tmpMemTable.getMemTableId(),
+            tmpMemTable,
+            tmpMemTable.memSize(),
+            tmpMemTable.getTotalPointsNum());
         shouldClose = true;
         return future;
       } catch (Exception e) {
@@ -982,7 +1026,9 @@ public class TsFileProcessor {
         return;
       }
       logger.info(
-          "Async flush a memtable to tsfile: {}", 
tsFileResource.getTsFile().getAbsolutePath());
+          "Async flush a memtable {} to tsfile: {}",
+          workMemTable.getMemTableId(),
+          tsFileResource.getTsFile().getAbsolutePath());
       addAMemtableIntoFlushingList(workMemTable);
     } catch (Exception e) {
       logger.error(
@@ -1007,7 +1053,9 @@ public class TsFileProcessor {
   private Future<?> addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws 
IOException {
     Map<String, Long> lastTimeForEachDevice = new HashMap<>();
     if (sequence) {
-      lastTimeForEachDevice = tobeFlushed.getMaxTime();
+      // TODO:修改lastFlushTime的取法
+      lastTimeForEachDevice = tobeFlushed.getTopKTime();
+      // lastTimeForEachDevice = tobeFlushed.getMaxTime();
       // If some devices have been removed in MemTable, the number of device 
in MemTable and
       // tsFileResource will not be the same. And the endTime of these devices 
in resource will be
       // Long.minValue.
@@ -1027,7 +1075,18 @@ public class TsFileProcessor {
     updateLatestFlushTimeCallback.call(this, lastTimeForEachDevice, 
lastWorkMemtableFlushTime);
 
     
SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost());
-    flushingMemTables.addLast(tobeFlushed);
+    // TODO:如果是顺序区并且k>0,拆分之后分别进入不同的waitinglist中,否则的话仍然按照原方式刷盘
+    if (MEMTABLE_TOPK_SIZE == 0) {
+      flushingMemTables.addLast(tobeFlushed);
+      workMemTable = null;
+    } else {
+      try {
+        createNewWorkingMemTable(tobeFlushed);
+      } catch (WriteProcessException e) {
+        throw new RuntimeException(e);
+      }
+      flushingMemTables.addLast(tobeFlushed);
+    }
     if (logger.isDebugEnabled()) {
       logger.debug(
           "{}: {} Memtable (signal = {}) is added into the flushing Memtable, 
queue size = {}",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index c1a71d32a53..c750fb6db69 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -55,7 +56,13 @@ public class WritableMemChunk implements IWritableMemChunk {
     this.list = TVList.newList(schema.getType());
   }
 
-  private WritableMemChunk() {}
+  public WritableMemChunk(IMeasurementSchema schema, TVList ls) {
+    this.schema = schema;
+    this.list = ls;
+  }
+
+  // private WritableMemChunk() {}
+  public WritableMemChunk() {}
 
   @Override
   public boolean writeWithFlushCheck(long insertTime, Object objectValue) {
@@ -251,11 +258,21 @@ public class WritableMemChunk implements 
IWritableMemChunk {
     return schema;
   }
 
+  @Override
+  public void setSchema(IMeasurementSchema s) {
+    this.schema = s;
+  }
+
   @Override
   public long getMaxTime() {
     return list.getMaxTime();
   }
 
+  @Override
+  public long getTopKTime() {
+    return list.getTopKTime();
+  }
+
   @Override
   public long getFirstPoint() {
     if (list.rowCount() == 0) {
@@ -274,6 +291,11 @@ public class WritableMemChunk implements IWritableMemChunk 
{
         .getTimestamp();
   }
 
+  @Override
+  public IWritableMemChunk divide() throws TopkDivideMemoryNotEnoughException {
+    return new WritableMemChunk(this.schema, list.divide());
+  }
+
   @Override
   public boolean isEmpty() {
     return list.rowCount() == 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
index 707eace7fa5..dbe03f14c02 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternUtil;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -172,6 +173,25 @@ public class WritableMemChunkGroup implements 
IWritableMemChunkGroup {
     return maxTime;
   }
 
+  @Override
+  public long getTopKTime() {
+    long maxTime = Long.MIN_VALUE;
+    for (IWritableMemChunk memChunk : memChunkMap.values()) {
+      maxTime = Math.max(maxTime, memChunk.getTopKTime());
+    }
+    return maxTime;
+  }
+
+  @Override
+  public IWritableMemChunkGroup divide() throws 
TopkDivideMemoryNotEnoughException {
+    WritableMemChunkGroup topkMemChunkGroup = new WritableMemChunkGroup();
+    for (String key : memChunkMap.keySet()) {
+      IWritableMemChunk topkMemChunk = memChunkMap.get(key).divide();
+      topkMemChunkGroup.memChunkMap.put(key, topkMemChunk);
+    }
+    return topkMemChunkGroup;
+  }
+
   @Override
   public int serializedSize() {
     int size = 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
index 33264fc46c1..4f1edd83ca1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
@@ -40,6 +40,7 @@ public class PrimitiveArrayManager {
   private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
   public static final int ARRAY_SIZE = CONFIG.getPrimitiveArraySize();
+  public static final int MEMTABLE_TOPK_SIZE = CONFIG.getSeqMemtableTopKSize();
 
   public static final TVListSortAlgorithm TVLIST_SORT_ALGORITHM = 
CONFIG.getTvListSortAlgorithm();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 4092db7d442..ee451ac026f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -43,8 +43,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
-import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM;
+import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.*;
 import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
 import static 
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
 import static 
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
@@ -647,6 +646,37 @@ public abstract class AlignedTVList extends TVList {
     }
   }
 
+  @Override
+  public TVList divide() throws TopkDivideMemoryNotEnoughException {
+    if (MEMTABLE_TOPK_SIZE > rowCount || MEMTABLE_TOPK_SIZE < ARRAY_SIZE) {
+      throw new TopkDivideMemoryNotEnoughException(
+          String.format(
+              "WARNING: MEMTABLE_TOPK_SIZE is bigger than%d "
+                  + "the TVList's row count  %d"
+                  + "or is smaller than ARRAY_SIZE %d",
+              rowCount, MEMTABLE_TOPK_SIZE, ARRAY_SIZE));
+    }
+    AlignedTVList topkTVList = AlignedTVList.newAlignedList(dataTypes);
+    int truncatedIndex = (rowCount - MEMTABLE_TOPK_SIZE + ARRAY_SIZE - 1) / 
ARRAY_SIZE;
+
+    for (int i = truncatedIndex + 1; i <= rowCount / ARRAY_SIZE; i++) {
+      topkTVList.timestamps.add(timestamps.get(i));
+      for (int colIndex = 0; colIndex < values.size(); colIndex++) {
+        topkTVList.values.get(colIndex).add(values.get(colIndex).get(i));
+        topkTVList.bitMaps.get(colIndex).add(bitMaps.get(colIndex).get(i));
+      }
+    }
+    for (int i = rowCount / ARRAY_SIZE; i >= truncatedIndex + 1; i--) {
+      timestamps.remove(timestamps.size() - 1);
+      for (int colIndex = 0; colIndex < values.size(); colIndex++) {
+        topkTVList.values.get(colIndex).remove(values.get(colIndex).size() - 
1);
+        topkTVList.bitMaps.get(colIndex).remove(bitMaps.get(colIndex).size() - 
1);
+      }
+    }
+    rowCount = truncatedIndex * ARRAY_SIZE;
+    topKTime = timestamps.get(truncatedIndex - 1)[ARRAY_SIZE - 1];
+    return topkTVList;
+  }
   /**
    * Get the row index value in index column.
    *
@@ -749,6 +779,8 @@ public abstract class AlignedTVList extends TVList {
         checkExpansion();
       }
     }
+    sort(Math.max(0, start - MEMTABLE_TOPK_SIZE), end);
+    topKTime = Math.max(topKTime, getTime(Math.max(0, end - 
MEMTABLE_TOPK_SIZE)));
   }
 
   private void arrayCopy(Object[] value, int idx, int arrayIndex, int 
elementIndex, int remaining) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java
index 6271a31734e..21783bc0370 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java
@@ -53,7 +53,7 @@ public interface BackwardSort extends QuickSort {
 
   /**
    * check block-inversions to find the proper block_size, which is a multiple 
of array_size. For
-   * totally ordered, the block_size will equals to array_size For totally 
reverse ordered, the
+   * totally ordered, the block_size will equal to array_size For totally 
reverse ordered, the
    * block_size will equals to the rowCount. INVERSION_RATIOS_THRESHOLD=0.005 
is a empiric value.
    *
    * @param timestamps
@@ -132,6 +132,6 @@ public interface BackwardSort extends QuickSort {
    * @param hi
    */
   default void sortBlock(int lo, int hi) {
-    qsort(lo, hi);
+    qSort(lo, hi);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index c64dbc94fd8..a59fa4f0342 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -34,8 +34,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
-import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM;
+import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.*;
 
 public abstract class IntTVList extends TVList {
   // list of primitive array, add 1 when expanded -> int primitive array
@@ -77,14 +76,59 @@ public abstract class IntTVList extends TVList {
   @Override
   public void putInt(long timestamp, int value) {
     checkExpansion();
-    int arrayIndex = rowCount / ARRAY_SIZE;
-    int elementIndex = rowCount % ARRAY_SIZE;
-    maxTime = Math.max(maxTime, timestamp);
-    timestamps.get(arrayIndex)[elementIndex] = timestamp;
-    values.get(arrayIndex)[elementIndex] = value;
-    rowCount++;
-    if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) {
-      sorted = false;
+    if (MEMTABLE_TOPK_SIZE == 0) {
+      int arrayIndex = rowCount / ARRAY_SIZE;
+      int elementIndex = rowCount % ARRAY_SIZE;
+      maxTime = Math.max(maxTime, timestamp);
+      topKTime = Math.max(topKTime, timestamp);
+      timestamps.get(arrayIndex)[elementIndex] = timestamp;
+      values.get(arrayIndex)[elementIndex] = value;
+      rowCount++;
+      if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) {
+        sorted = false;
+      }
+    } else {
+      int arrayIndex;
+      int elementIndex;
+      int waitlen = Math.min(rowCount, MEMTABLE_TOPK_SIZE);
+      int tempRowCount = rowCount - 1; // 记录向前比较的截止位置
+      while (waitlen > 0) {
+        waitlen--;
+        arrayIndex = tempRowCount / ARRAY_SIZE;
+        elementIndex = tempRowCount % ARRAY_SIZE;
+        if (timestamps.get(arrayIndex)[elementIndex] > timestamp) {
+          tempRowCount--;
+          continue;
+        }
+        break;
+      }
+      arrayIndex = rowCount / ARRAY_SIZE;
+      elementIndex = rowCount % ARRAY_SIZE;
+      int arrayIndexLeft = (rowCount - 1) / ARRAY_SIZE;
+      int elementIndexLeft = (rowCount - 1) % ARRAY_SIZE;
+      for (int i = rowCount - 1; i > tempRowCount; i--) {
+        timestamps.get(arrayIndex)[elementIndex] = 
timestamps.get(arrayIndexLeft)[elementIndexLeft];
+        values.get(arrayIndex)[elementIndex] = 
values.get(arrayIndexLeft)[elementIndexLeft];
+        arrayIndex = arrayIndexLeft;
+        elementIndex = elementIndexLeft;
+        arrayIndexLeft = (i - 1) / ARRAY_SIZE;
+        elementIndexLeft = (i - 1) % ARRAY_SIZE;
+        sortCount++;
+      }
+      arrayIndex = (tempRowCount + 1) / ARRAY_SIZE;
+      elementIndex = (tempRowCount + 1) % ARRAY_SIZE;
+      maxTime = Math.max(maxTime, timestamp);
+      timestamps.get(arrayIndex)[elementIndex] = timestamp;
+      values.get(arrayIndex)[elementIndex] = value;
+      if (rowCount > MEMTABLE_TOPK_SIZE) {
+        arrayIndex = (rowCount - MEMTABLE_TOPK_SIZE - 1) / ARRAY_SIZE;
+        elementIndex = (rowCount - MEMTABLE_TOPK_SIZE - 1) % ARRAY_SIZE;
+        topKTime = Math.max(topKTime, 
timestamps.get(arrayIndex)[elementIndex]);
+      }
+      rowCount++;
+      if (sorted && rowCount > 1 && waitlen == 0) {
+        sorted = false;
+      }
     }
   }
 
@@ -123,6 +167,37 @@ public abstract class IntTVList extends TVList {
     values.add((int[]) getPrimitiveArraysByType(TSDataType.INT32));
   }
 
+  @Override
+  public TVList divide() throws TopkDivideMemoryNotEnoughException {
+    if (MEMTABLE_TOPK_SIZE > rowCount || MEMTABLE_TOPK_SIZE < ARRAY_SIZE) {
+      throw new TopkDivideMemoryNotEnoughException(
+          String.format(
+              "WARNING: MEMTABLE_TOPK_SIZE %d is bigger than "
+                  + "the TVList's row count %d, "
+                  + "or is smaller than ARRAY_SIZE %d",
+              MEMTABLE_TOPK_SIZE, rowCount, ARRAY_SIZE));
+    }
+    IntTVList topkTVList = IntTVList.newList();
+    int truncatedIndex = rowCount - MEMTABLE_TOPK_SIZE;
+    int truncatedArrayIndex =
+        truncatedIndex / ARRAY_SIZE; // no matter truncatedIndex in or not in 
the block
+    truncatedIndex = truncatedArrayIndex * ARRAY_SIZE;
+    for (int i = truncatedArrayIndex; i < timestamps.size(); i++) {
+      topkTVList.timestamps.add(timestamps.get(i));
+      topkTVList.values.add(values.get(i));
+    }
+    for (int i = timestamps.size() - 1; i >= truncatedArrayIndex; i--) {
+      timestamps.remove(timestamps.size() - 1);
+      values.remove(values.size() - 1);
+    }
+    topkTVList.rowCount = rowCount - truncatedIndex;
+    topkTVList.sorted = true;
+    topkTVList.topKTime = topkTVList.getTime(0);
+    rowCount = truncatedIndex;
+    topKTime = topkTVList.getTopKTime();
+    return topkTVList;
+  }
+
   @Override
   public TimeValuePair getTimeValuePair(int index) {
     return new TimeValuePair(
@@ -159,47 +234,8 @@ public abstract class IntTVList extends TVList {
 
   @Override
   public void putInts(long[] time, int[] value, BitMap bitMap, int start, int 
end) {
-    checkExpansion();
-
-    int idx = start;
-    // constraint: time.length + timeIdxOffset == value.length
-    int timeIdxOffset = 0;
-    if (bitMap != null && !bitMap.isAllUnmarked()) {
-      // time array is a reference, should clone necessary time values
-      long[] clonedTime = new long[end - start];
-      System.arraycopy(time, start, clonedTime, 0, end - start);
-      time = clonedTime;
-      timeIdxOffset = start;
-      // drop null at the end of value array
-      int nullCnt =
-          dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, 
end, timeIdxOffset);
-      end -= nullCnt;
-    } else {
-      updateMaxTimeAndSorted(time, start, end);
-    }
-
-    while (idx < end) {
-      int inputRemaining = end - idx;
-      int arrayIdx = rowCount / ARRAY_SIZE;
-      int elementIdx = rowCount % ARRAY_SIZE;
-      int internalRemaining = ARRAY_SIZE - elementIdx;
-      if (internalRemaining >= inputRemaining) {
-        // the remaining inputs can fit the last array, copy all remaining 
inputs into last array
-        System.arraycopy(
-            time, idx - timeIdxOffset, timestamps.get(arrayIdx), elementIdx, 
inputRemaining);
-        System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, 
inputRemaining);
-        rowCount += inputRemaining;
-        break;
-      } else {
-        // the remaining inputs cannot fit the last array, fill the last array 
and create a new
-        // one and enter the next loop
-        System.arraycopy(
-            time, idx - timeIdxOffset, timestamps.get(arrayIdx), elementIdx, 
internalRemaining);
-        System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, 
internalRemaining);
-        idx += internalRemaining;
-        rowCount += internalRemaining;
-        checkExpansion();
-      }
+    for (int i = start; i < end; i++) {
+      putInt(time[i], value[i]);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java
index 5e6a9dec2dc..d60067af576 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java
@@ -30,11 +30,16 @@ public class QuickAlignedTVList extends AlignedTVList 
implements QuickSort {
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
+  @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
   @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java
index dcdcbc2cb59..ee4e6f9cbc8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java
@@ -42,11 +42,16 @@ public class QuickBinaryTVList extends BinaryTVList 
implements QuickSort {
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
+  @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
   @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java
index e4fbfaefca2..c7b43470551 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java
@@ -39,11 +39,16 @@ public class QuickBooleanTVList extends BooleanTVList 
implements QuickSort {
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
+  @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
   @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java
index 852844f9231..584c8f8f789 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java
@@ -39,11 +39,16 @@ public class QuickDoubleTVList extends DoubleTVList 
implements QuickSort {
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
+  @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
   @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java
index 409a00a093e..90485257276 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java
@@ -39,11 +39,16 @@ public class QuickFloatTVList extends FloatTVList 
implements QuickSort {
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
+  @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
   @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java
index bc44cf79053..d24842babac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java
@@ -22,11 +22,16 @@ public class QuickIntTVList extends IntTVList implements 
QuickSort {
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
+  @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
   @Override
   public void swap(int p, int q) {
     int valueP = getInt(p);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java
index 16d8fab3606..3d4f1ab1a7c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java
@@ -39,11 +39,16 @@ public class QuickLongTVList extends LongTVList implements 
QuickSort {
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
+  @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
   @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java
index 8229781aa05..9bad94d4f9b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java
@@ -58,7 +58,7 @@ public interface QuickSort {
   //        }
   //    }
 
-  default void qsort(int lo, int hi) {
+  default void qSort(int lo, int hi) {
     if (lo < hi) {
       // TODO: use insertion sort in smaller array
       // if(hi - lo <= 32) {
@@ -66,8 +66,8 @@ public interface QuickSort {
       // }
       // partition
       int pivotIndex = partition(lo, hi);
-      qsort(lo, pivotIndex - 1);
-      qsort(pivotIndex + 1, hi);
+      qSort(lo, pivotIndex - 1);
+      qSort(pivotIndex + 1, hi);
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index f2b26158a5f..8430df0b317 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -42,6 +42,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
+import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.MEMTABLE_TOPK_SIZE;
 import static 
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
 import static 
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
 
@@ -56,6 +57,8 @@ public abstract class TVList implements WALEntryValue {
 
   protected boolean sorted = true;
   protected long maxTime;
+  protected long topKTime;
+  protected int sortCount;
   // record reference count of this tv list
   // currently this reference will only be increase because we can't know when 
to decrease it
   protected AtomicInteger referenceCount;
@@ -64,7 +67,9 @@ public abstract class TVList implements WALEntryValue {
   protected TVList() {
     timestamps = new ArrayList<>();
     rowCount = 0;
+    sortCount = 0;
     maxTime = Long.MIN_VALUE;
+    topKTime = Long.MIN_VALUE;
     referenceCount = new AtomicInteger();
   }
 
@@ -107,6 +112,8 @@ public abstract class TVList implements WALEntryValue {
 
   public abstract void sort();
 
+  public abstract void sort(int lo, int hi);
+
   public void increaseReferenceCount() {
     referenceCount.incrementAndGet();
   }
@@ -212,6 +219,10 @@ public abstract class TVList implements WALEntryValue {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
 
+  public TVList divide() throws TopkDivideMemoryNotEnoughException {
+    throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
+  }
+
   public Object getAlignedValue(int index) {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
@@ -229,6 +240,10 @@ public abstract class TVList implements WALEntryValue {
     return maxTime;
   }
 
+  public long getTopKTime() {
+    return topKTime;
+  }
+
   public long getVersion() {
     return version;
   }
@@ -328,6 +343,7 @@ public abstract class TVList implements WALEntryValue {
     for (int i = start; i < end; i++) {
       inPutMinTime = Math.min(inPutMinTime, time[i]);
       maxTime = Math.max(maxTime, time[i]);
+      topKTime = Math.max(topKTime, time[Math.max(0, i - MEMTABLE_TOPK_SIZE)]);
       if (inputSorted && i < length - 1 && time[i] > time[i + 1]) {
         inputSorted = false;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java
index 7c6fa7f92ef..819b00e3c93 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java
@@ -50,7 +50,7 @@ public class TimAlignedTVList extends AlignedTVList 
implements TimSort {
           (int[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, rowCount);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      this.timSort(0, rowCount);
     }
     clearSortedValue();
     clearSortedTime();
@@ -58,7 +58,12 @@ public class TimAlignedTVList extends AlignedTVList 
implements TimSort {
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java
index 03760fd14c8..fccb64a1b2b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java
@@ -44,14 +44,19 @@ public class TimBinaryTVList extends BinaryTVList 
implements TimSort {
       sortedValues =
           (Binary[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.TEXT, rowCount);
     }
-    sort(0, rowCount);
+    timSort(0, rowCount);
     clearSortedValue();
     clearSortedTime();
     sorted = true;
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java
index 22b56475905..04d43f30bb9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java
@@ -43,7 +43,7 @@ public class TimBooleanTVList extends BooleanTVList 
implements TimSort {
           (boolean[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.BOOLEAN, rowCount);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      timSort(0, rowCount);
     }
     clearSortedValue();
     clearSortedTime();
@@ -51,7 +51,12 @@ public class TimBooleanTVList extends BooleanTVList 
implements TimSort {
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java
index 3c0aa823798..543a4e2ec29 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java
@@ -43,7 +43,7 @@ public class TimDoubleTVList extends DoubleTVList implements 
TimSort {
           (double[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.DOUBLE, rowCount);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      timSort(0, rowCount);
     }
     clearSortedValue();
     clearSortedTime();
@@ -51,7 +51,12 @@ public class TimDoubleTVList extends DoubleTVList implements 
TimSort {
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java
index 2b1fb440a95..d360959a81e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java
@@ -44,7 +44,7 @@ public class TimFloatTVList extends FloatTVList implements 
TimSort {
           (float[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.FLOAT, rowCount);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      timSort(0, rowCount);
     }
     clearSortedValue();
     clearSortedTime();
@@ -52,7 +52,12 @@ public class TimFloatTVList extends FloatTVList implements 
TimSort {
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java
index b3ae939892d..d2f153e05a1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java
@@ -33,18 +33,22 @@ public class TimIntTVList extends IntTVList implements 
TimSort {
 
   @Override
   public void sort() {
+    sort(0, rowCount);
+  }
+
+  @Override
+  public void sort(int lo, int hi) {
+    int len = hi - lo;
     if (sortedTimestamps == null
-        || sortedTimestamps.length < 
PrimitiveArrayManager.getArrayRowCount(rowCount)) {
+        || sortedTimestamps.length < 
PrimitiveArrayManager.getArrayRowCount(len)) {
       sortedTimestamps =
-          (long[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, rowCount);
+          (long[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, len);
     }
-    if (sortedValues == null
-        || sortedValues.length < 
PrimitiveArrayManager.getArrayRowCount(rowCount)) {
-      sortedValues =
-          (int[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, rowCount);
+    if (sortedValues == null || sortedValues.length < 
PrimitiveArrayManager.getArrayRowCount(len)) {
+      sortedValues = (int[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, len);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      timSort(0, len);
     }
     clearSortedValue();
     clearSortedTime();
@@ -52,7 +56,7 @@ public class TimIntTVList extends IntTVList implements 
TimSort {
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java
index e3a02e41db5..9096cd47e1b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java
@@ -43,7 +43,7 @@ public class TimLongTVList extends LongTVList implements 
TimSort {
           (long[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, rowCount);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      timSort(0, rowCount);
     }
     clearSortedValue();
     clearSortedTime();
@@ -51,7 +51,12 @@ public class TimLongTVList extends LongTVList implements 
TimSort {
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java
index 63584429d4a..0426fdafe66 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java
@@ -27,7 +27,7 @@ public interface TimSort {
   int SMALL_ARRAY_LENGTH = 32;
 
   /** the same as the 'set' function in TVList, the reason is to avoid two 
equal functions. */
-  void tim_set(int src, int dest);
+  void timSet(int src, int dest);
 
   void setFromSorted(int src, int dest);
 
@@ -55,7 +55,7 @@ public interface TimSort {
    * the entrance of tim_sort; 1. array_size <= 32, use binary sort. 2. 
recursively invoke merge
    * sort.
    */
-  default void sort(int lo, int hi) {
+  default void timSort(int lo, int hi) {
     if (lo == hi) {
       return;
     }
@@ -65,8 +65,8 @@ public interface TimSort {
       return;
     }
     int mid = (lo + hi) >>> 1;
-    sort(lo, mid);
-    sort(mid, hi);
+    timSort(lo, mid);
+    timSort(mid, hi);
     merge(lo, mid, hi);
   }
 
@@ -127,7 +127,7 @@ public interface TimSort {
        */
       int n = start - left; // The number of elements to move
       for (int i = n; i >= 1; i--) {
-        tim_set(left + i - 1, left + i);
+        timSet(left + i - 1, left + i);
       }
       setPivotTo(left);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TopkDivideMemoryNotEnoughException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TopkDivideMemoryNotEnoughException.java
new file mode 100644
index 00000000000..e1d8d727ec6
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TopkDivideMemoryNotEnoughException.java
@@ -0,0 +1,8 @@
+package org.apache.iotdb.db.utils.datastructure;
+
+public class TopkDivideMemoryNotEnoughException extends Exception {
+
+  public TopkDivideMemoryNotEnoughException(String message) {
+    super(message);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index eb959726c5c..c9d3a67f323 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -464,7 +464,7 @@ data_replication_factor=1
 # Add a switch to enable separate sequence and unsequence data.
 # If it is true, then data will be separated into seq and unseq data dir. If 
it is false, then all data will be written into unseq data dir.
 # Datatype: boolean
-# enable_separate_data=true
+enable_separate_data=false
 
 # What will the system do when unrecoverable error occurs.
 # Datatype: String
@@ -509,9 +509,14 @@ data_replication_factor=1
 # BACKWARD: backward sort
 # tvlist_sort_algorithm=TIM
 
+# The number of points stay in the next tvlist
+# The default waiting size is 0
+# Datatype: int
+seq_memtable_topk_size=0
+
 # When the average point number of timeseries in memtable exceeds this, the 
memtable is flushed to disk. The default threshold is 100000.
 # Datatype: int
-# avg_series_point_number_threshold=100000
+avg_series_point_number_threshold=100000
 
 # How many threads can concurrently flush. When <= 0, use CPU core number.
 # Datatype: int
@@ -544,15 +549,15 @@ data_replication_factor=1
 ####################
 # sequence space compaction: only compact the sequence files
 # Datatype: boolean
-# enable_seq_space_compaction=true
+enable_seq_space_compaction=false
 
 # unsequence space compaction: only compact the unsequence files
 # Datatype: boolean
-# enable_unseq_space_compaction=true
+enable_unseq_space_compaction=false
 
 # cross space compaction: compact the unsequence files into the overlapped 
sequence files
 # Datatype: boolean
-# enable_cross_space_compaction=true
+enable_cross_space_compaction=false
 
 # the selector of cross space compaction task
 # Options: rewrite
@@ -644,7 +649,7 @@ data_replication_factor=1
 
 # The interval of compaction task schedule
 # Datatype: long, Unit: ms
-# compaction_schedule_interval_in_ms=60000
+compaction_schedule_interval_in_ms=600000000
 
 # The interval of compaction task submission
 # Datatype: long, Unit: ms

Reply via email to