This is an automated email from the ASF dual-hosted git repository.

spricoder pushed a commit to branch research/deferred_strategy
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 64c5f9ba847aff2169a8b50626dbd1857635bfc9
Author: LittleHealth <[email protected]>
AuthorDate: Sun Oct 15 00:27:38 2023 +0800

    [Memtable] implement the deffered strategy
---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   7 +-
 .../db/storageengine/dataregion/DataRegion.java    |   8 +-
 .../dataregion/flush/MemTableFlushTask.java        |  13 ++-
 .../dataregion/memtable/AbstractMemTable.java      |  64 ++++++++++++-
 .../memtable/AlignedWritableMemChunk.java          |  14 +++
 .../memtable/AlignedWritableMemChunkGroup.java     |  13 +++
 .../dataregion/memtable/IMemTable.java             |   9 ++
 .../dataregion/memtable/IWritableMemChunk.java     |   9 ++
 .../memtable/IWritableMemChunkGroup.java           |   5 ++
 .../dataregion/memtable/PrimitiveMemTable.java     |   7 +-
 .../dataregion/memtable/TsFileProcessor.java       |  81 +++++++++++++++--
 .../dataregion/memtable/WritableMemChunk.java      |  24 ++++-
 .../dataregion/memtable/WritableMemChunkGroup.java |  20 +++++
 .../rescon/memory/PrimitiveArrayManager.java       |   1 +
 .../db/utils/datastructure/AlignedTVList.java      |  36 +++++++-
 .../iotdb/db/utils/datastructure/BackwardSort.java |   4 +-
 .../iotdb/db/utils/datastructure/IntTVList.java    | 100 ++++++++++++++++++---
 .../db/utils/datastructure/QuickAlignedTVList.java |   7 +-
 .../db/utils/datastructure/QuickBinaryTVList.java  |   7 +-
 .../db/utils/datastructure/QuickBooleanTVList.java |   7 +-
 .../db/utils/datastructure/QuickDoubleTVList.java  |   7 +-
 .../db/utils/datastructure/QuickFloatTVList.java   |   7 +-
 .../db/utils/datastructure/QuickIntTVList.java     |   7 +-
 .../db/utils/datastructure/QuickLongTVList.java    |   7 +-
 .../iotdb/db/utils/datastructure/QuickSort.java    |   6 +-
 .../iotdb/db/utils/datastructure/TVList.java       |  16 ++++
 .../db/utils/datastructure/TimAlignedTVList.java   |   9 +-
 .../db/utils/datastructure/TimBinaryTVList.java    |   9 +-
 .../db/utils/datastructure/TimBooleanTVList.java   |   9 +-
 .../db/utils/datastructure/TimDoubleTVList.java    |   9 +-
 .../db/utils/datastructure/TimFloatTVList.java     |   9 +-
 .../iotdb/db/utils/datastructure/TimIntTVList.java |  20 +++--
 .../db/utils/datastructure/TimLongTVList.java      |   9 +-
 .../iotdb/db/utils/datastructure/TimSort.java      |  10 +--
 .../TopkDivideMemoryNotEnoughException.java        |   8 ++
 .../resources/conf/iotdb-common.properties         |   5 ++
 36 files changed, 511 insertions(+), 72 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7850bf6b103..c534cb1f715 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -419,6 +419,11 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "tvlist_sort_algorithm", 
conf.getTvListSortAlgorithm().toString())));
 
+    conf.setSeqMemtableTopKSize(
+        (Integer.parseInt(
+            properties.getProperty(
+                "seq_memtable_topk_size", 
Integer.toString(conf.getSeqMemtableTopKSize())))));
+
     conf.setAvgSeriesPointNumberThreshold(
         Integer.parseInt(
             properties.getProperty(
@@ -1704,7 +1709,7 @@ public class IoTDBDescriptor {
     logger.info("initial allocateMemoryForWrite = {}", 
conf.getAllocateMemoryForStorageEngine());
     logger.info("initial allocateMemoryForSchema = {}", 
conf.getAllocateMemoryForSchema());
     logger.info("initial allocateMemoryForConsensus = {}", 
conf.getAllocateMemoryForConsensus());
-
+    logger.info("initial seqMemtableTopKSize = {}", 
conf.getSeqMemtableTopKSize());
     initSchemaMemoryAllocate(properties);
     initStorageEngineAllocate(properties);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 75609c20f93..f33f19557f8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1267,7 +1267,6 @@ public class DataRegion implements IDataRegionForQuery {
             version,
             0,
             0);
-
     return getTsFileProcessor(sequence, filePath, timePartitionId);
   }
 
@@ -1393,6 +1392,7 @@ public class DataRegion implements IDataRegionForQuery {
       if 
(!workSequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) {
         
timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
       }
+      logger.info("close an unsequence tsfile processor {}", databaseName + 
"-" + dataRegionId);
     }
   }
 
@@ -1557,7 +1557,8 @@ public class DataRegion implements IDataRegionForQuery {
       for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
         if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
           logger.info(
-              "Exceed sequence memtable flush interval, so flush working 
memtable of time partition {} in database {}[{}]",
+              "Exceed sequence memtable {} flush interval, so flush working 
memtable of time partition {} in database {}[{}]",
+              tsFileProcessor.getWorkMemTable().getMemTableId(),
               tsFileProcessor.getTimeRangeId(),
               databaseName,
               dataRegionId);
@@ -1580,7 +1581,8 @@ public class DataRegion implements IDataRegionForQuery {
       for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
         if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
           logger.info(
-              "Exceed unsequence memtable flush interval, so flush working 
memtable of time partition {} in database {}[{}]",
+              "Exceed unsequence memtable {} flush interval, so flush working 
memtable of time partition {} in database {}[{}]",
+              tsFileProcessor.getWorkMemTable().getMemTableId(),
               tsFileProcessor.getTimeRangeId(),
               databaseName,
               dataRegionId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 19450cbe9fc..aa84bface67 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -111,7 +111,8 @@ public class MemTableFlushTask {
             ? 0
             : memTable.getTotalPointsNum() / memTable.getSeriesNumber();
     LOGGER.info(
-        "The memTable size of SG {} is {}, the avg series points num in chunk 
is {}, total timeseries number is {}",
+        "The memTable {} size of SG {} is {}, the avg series points num in 
chunk is {}, total timeseries number is {}",
+        memTable.getMemTableId(),
         storageGroup,
         memTable.memSize(),
         avgSeriesPointsNum,
@@ -215,8 +216,9 @@ public class MemTableFlushTask {
             "flush");
 
     LOGGER.info(
-        "Database {} memtable {} flushing a memtable has finished! Time 
consumption: {}ms",
+        "Database {} flushing memtable {} with {} has finished! Time 
consumption: {}ms",
         storageGroup,
+        memTable.getMemTableId(),
         memTable,
         System.currentTimeMillis() - start);
   }
@@ -286,8 +288,13 @@ public class MemTableFlushTask {
           recordFlushPointsMetric();
 
           LOGGER.info(
-              "Database {}, flushing memtable {} into disk: Encoding data cost 
" + "{} ms.",
+              "Database {}, flushing memtable {} with size {} and points {} in 
file "
+                  + "{} into disk: Encoding data cost "
+                  + "{} ms.",
               storageGroup,
+              memTable.getMemTableId(),
+              memTable.memSize(),
+              memTable.getTotalPointsNum(),
               writer.getFile().getName(),
               memSerializeTime);
           WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_ENCODING, 
memSerializeTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index bdba167c5bb..3af36c60209 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -36,6 +36,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 import org.apache.iotdb.db.utils.MemUtils;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -85,24 +86,48 @@ public abstract class AbstractMemTable implements IMemTable 
{
 
   private long totalPointsNum = 0;
 
+  public long getTotalPointsNumThreshold() {
+    return totalPointsNumThreshold;
+  }
+
   private long totalPointsNumThreshold = 0;
 
   private long maxPlanIndex = Long.MIN_VALUE;
 
   private long minPlanIndex = Long.MAX_VALUE;
 
-  private final long memTableId = memTableIdCounter.incrementAndGet();
+  private long memTableId;
 
-  private final long createdTime = System.currentTimeMillis();
+  private long createdTime;
 
   private static final String METRIC_POINT_IN = "pointsIn";
 
   protected AbstractMemTable() {
     this.memTableMap = new HashMap<>();
+    createdTime = System.currentTimeMillis();
+    memTableId = memTableIdCounter.incrementAndGet();
   }
 
-  protected AbstractMemTable(Map<IDeviceID, IWritableMemChunkGroup> 
memTableMap) {
+  protected AbstractMemTable(
+      Map<IDeviceID, IWritableMemChunkGroup> memTableMap, AbstractMemTable 
lastMemTable) {
     this.memTableMap = memTableMap;
+    for (IDeviceID key : memTableMap.keySet()) {
+      Map<String, IWritableMemChunk> memChunkMap = 
memTableMap.get(key).getMemChunkMap();
+      for (String schema : memChunkMap.keySet()) {
+        IWritableMemChunk memChunk = memChunkMap.get(schema);
+        totalPointsNum += memChunk.getTVList().rowCount();
+        memSize +=
+            (long) memChunk.getTVList().rowCount()
+                * memChunk.getSchema().getType().getDataTypeSize();
+      }
+    }
+    totalPointsNumThreshold = lastMemTable.getTotalPointsNumThreshold();
+    createdTime = System.currentTimeMillis();
+    memTableId = memTableIdCounter.incrementAndGet();
+    seriesNumber = lastMemTable.getSeriesNumber();
+    tvListRamCost = lastMemTable.getTVListsRamCost() * totalPointsNum / 
lastMemTable.totalPointsNum;
+    shouldFlush = false;
+    flushStatus = FlushStatus.WORKING;
   }
 
   @Override
@@ -402,6 +427,16 @@ public abstract class AbstractMemTable implements 
IMemTable {
     return memSize;
   }
 
+  @Override
+  public void setMemSize(long m) {
+    memSize = memSize + m;
+  }
+
+  @Override
+  public void setTotalPointsNum(long m) {
+    totalPointsNum = totalPointsNum + m;
+  }
+
   @Override
   public boolean reachTotalPointNumThreshold() {
     if (totalPointsNum == 0) {
@@ -489,6 +524,20 @@ public abstract class AbstractMemTable implements 
IMemTable {
     }
   }
 
+  @Override
+  public IMemTable divide() throws TopkDivideMemoryNotEnoughException {
+    Map<IDeviceID, IWritableMemChunkGroup> topkMemTableMap = new HashMap<>();
+    for (IDeviceID key : memTableMap.keySet()) {
+      IWritableMemChunkGroup chunkGroupTemp = memTableMap.get(key).divide();
+      topkMemTableMap.put(key, chunkGroupTemp);
+    }
+    PrimitiveMemTable topkMemtable = new PrimitiveMemTable(topkMemTableMap, 
this);
+    memSize -= topkMemtable.memSize();
+    totalPointsNum -= topkMemtable.getTotalPointsNum();
+    tvListRamCost -= topkMemtable.getTVListsRamCost();
+    return topkMemtable;
+  }
+
   @Override
   public void addTVListRamCost(long cost) {
     this.tvListRamCost += cost;
@@ -634,6 +683,15 @@ public abstract class AbstractMemTable implements 
IMemTable {
     return latestTimeForEachDevice;
   }
 
+  @Override
+  public Map<String, Long> getTopKTime() {
+    Map<String, Long> latestTimeForEachDevice = new HashMap<>();
+    for (Entry<IDeviceID, IWritableMemChunkGroup> entry : 
memTableMap.entrySet()) {
+      latestTimeForEachDevice.put(entry.getKey().toStringID(), 
entry.getValue().getTopKTime());
+    }
+    return latestTimeForEachDevice;
+  }
+
   public static class Factory {
     private Factory() {
       // Empty constructor
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index aa4b3c4d6ed..1046ec0e296 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferVie
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -251,11 +252,19 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
     return null;
   }
 
+  @Override
+  public void setSchema(IMeasurementSchema s) {}
+
   @Override
   public long getMaxTime() {
     return list.getMaxTime();
   }
 
+  @Override
+  public long getTopKTime() {
+    return list.getTopKTime();
+  }
+
   @Override
   public synchronized TVList getSortedTvListForQuery() {
     sortTVList();
@@ -464,6 +473,11 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
         .getTimestamp();
   }
 
+  @Override
+  public IWritableMemChunk divide() throws TopkDivideMemoryNotEnoughException {
+    return new AlignedWritableMemChunk(schemaList, (AlignedTVList) 
list.divide());
+  }
+
   @Override
   public boolean isEmpty() {
     return list.rowCount() == 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
index 17b12a748da..875c1130692 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternUtil;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -143,6 +144,18 @@ public class AlignedWritableMemChunkGroup implements 
IWritableMemChunkGroup {
     return memChunk.getMaxTime();
   }
 
+  @Override
+  public long getTopKTime() {
+    return memChunk.getTopKTime();
+  }
+
+  @Override
+  public IWritableMemChunkGroup divide() throws 
TopkDivideMemoryNotEnoughException {
+    AlignedWritableMemChunkGroup topkMemChunkGroup = new 
AlignedWritableMemChunkGroup();
+    topkMemChunkGroup.memChunk = (AlignedWritableMemChunk) memChunk.divide();
+    return topkMemChunkGroup;
+  }
+
   public AlignedWritableMemChunk getAlignedMemChunk() {
     return memChunk;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index b4b0204080d..1f898c05a9c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTablet
 import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
@@ -64,6 +65,10 @@ public interface IMemTable extends WALEntryValue {
   /** @return memory usage */
   long memSize();
 
+  void setMemSize(long m);
+
+  void setTotalPointsNum(long m);
+
   /** only used when mem control enabled */
   void addTVListRamCost(long cost);
 
@@ -128,6 +133,8 @@ public interface IMemTable extends WALEntryValue {
    */
   void delete(PartialPath path, PartialPath devicePath, long startTimestamp, 
long endTimestamp);
 
+  /** divide the memtable into the topk and regular ones @ return the topk 
memtable. */
+  IMemTable divide() throws TopkDivideMemoryNotEnoughException;
   /**
    * Make a copy of this MemTable.
    *
@@ -169,4 +176,6 @@ public interface IMemTable extends WALEntryValue {
   void setFlushStatus(FlushStatus flushStatus);
 
   Map<String, Long> getMaxTime();
+
+  Map<String, Long> getTopKTime();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 4c07c3ece33..4b643cb35d1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
@@ -83,6 +84,8 @@ public interface IWritableMemChunk extends WALEntryValue {
 
   IMeasurementSchema getSchema();
 
+  void setSchema(IMeasurementSchema s);
+
   /**
    * served for read requests.
    *
@@ -128,6 +131,10 @@ public interface IWritableMemChunk extends WALEntryValue {
     return Long.MAX_VALUE;
   }
 
+  default long getTopKTime() {
+    return Long.MAX_VALUE;
+  }
+
   /** @return how many points are deleted */
   int delete(long lowerBound, long upperBound);
 
@@ -141,5 +148,7 @@ public interface IWritableMemChunk extends WALEntryValue {
 
   long getLastPoint();
 
+  IWritableMemChunk divide() throws TopkDivideMemoryNotEnoughException;
+
   boolean isEmpty();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
index ecaf8b2b738..eb3c1f0854d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
@@ -53,5 +54,9 @@ public interface IWritableMemChunkGroup extends WALEntryValue 
{
 
   long getCurrentTVListSize(String measurement);
 
+  IWritableMemChunkGroup divide() throws TopkDivideMemoryNotEnoughException;
+
   long getMaxTime();
+
+  long getTopKTime();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
index d4bf9c18743..70f29dd0920 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
@@ -30,15 +30,16 @@ public class PrimitiveMemTable extends AbstractMemTable {
     this.disableMemControl = !enableMemControl;
   }
 
-  public PrimitiveMemTable(Map<IDeviceID, IWritableMemChunkGroup> memTableMap) 
{
-    super(memTableMap);
+  public PrimitiveMemTable(
+      Map<IDeviceID, IWritableMemChunkGroup> memTableMap, AbstractMemTable 
lastMemtable) {
+    super(memTableMap, lastMemtable);
   }
 
   @Override
   public IMemTable copy() {
     Map<IDeviceID, IWritableMemChunkGroup> newMap = new 
HashMap<>(getMemTableMap());
 
-    return new PrimitiveMemTable(newMap);
+    return new PrimitiveMemTable(newMap, this);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 3cd09e73e92..28ca6cdac34 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -62,6 +62,7 @@ import 
org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -91,6 +92,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.GET_QUERY_RESOURCE_FROM_MEM;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.FLUSHING_MEMTABLE;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.WORKING_MEMTABLE;
+import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.MEMTABLE_TOPK_SIZE;
 
 @SuppressWarnings("java:S1135") // ignore todos
 public class TsFileProcessor {
@@ -114,6 +116,8 @@ public class TsFileProcessor {
 
   /** sync this object in read() and asyncTryToFlush(). */
   private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new 
ConcurrentLinkedDeque<>();
+  // private final ConcurrentLinkedDeque<IMemTable> waitingMemTables = new
+  // ConcurrentLinkedDeque<>();
 
   /** modification to memtable mapping. */
   private final List<Pair<Modification, IMemTable>> modsToMemtable = new 
ArrayList<>();
@@ -121,11 +125,14 @@ public class TsFileProcessor {
   /** writer for restore tsfile and flushing. */
   private RestorableTsFileIOWriter writer;
 
+  private RestorableTsFileIOWriter writerTemp;
+
   /** tsfile resource for index this tsfile. */
   private final TsFileResource tsFileResource;
 
   /** time range index to indicate this processor belongs to which time range 
*/
   private long timeRangeId;
+
   /**
    * Whether the processor is in the queue of the FlushManager or being 
flushed by a flush thread.
    */
@@ -175,6 +182,8 @@ public class TsFileProcessor {
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS 
=
       PerformanceOverviewMetrics.getInstance();
 
+  private File tsfileTemp;
+
   @SuppressWarnings("squid:S107")
   public TsFileProcessor(
       String storageGroupName,
@@ -308,6 +317,39 @@ public class TsFileProcessor {
   private void createNewWorkingMemTable() throws WriteProcessException {
     workMemTable = 
MemTableManager.getInstance().getAvailableMemTable(storageGroupName);
     walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath());
+    logger.info(
+        "Succeed to create {} memTable {}",
+        sequence ? "Sequence" : "Unsequence",
+        workMemTable.getMemTableId());
+  }
+
+  private void createNewWorkingMemTable(IMemTable tobeFlushed) throws 
WriteProcessException {
+    if (MEMTABLE_TOPK_SIZE != 0) {
+      try {
+        logger.info(
+            "Try to divide memTable {} with size {} and points {}",
+            tobeFlushed.getMemTableId(),
+            tobeFlushed.memSize(),
+            tobeFlushed.getTotalPointsNum());
+        workMemTable = tobeFlushed.divide();
+        walNode.onMemTableCreated(workMemTable, 
tsFileResource.getTsFilePath());
+        logger.info(
+            "Succeed to divide {} memTable {} with size {} and points {} to "
+                + "new workMemTable {} with size {} and points {} to",
+            sequence ? "Sequence" : "Unsequence",
+            tobeFlushed.getMemTableId(),
+            tobeFlushed.memSize(),
+            tobeFlushed.getTotalPointsNum(),
+            workMemTable.getMemTableId(),
+            workMemTable.memSize(),
+            workMemTable.getTotalPointsNum());
+      } catch (TopkDivideMemoryNotEnoughException e) {
+        logger.warn(e.getMessage());
+        createNewWorkingMemTable();
+      }
+    } else {
+      createNewWorkingMemTable();
+    }
   }
 
   /**
@@ -858,15 +900,16 @@ public class TsFileProcessor {
       if (logger.isInfoEnabled()) {
         if (workMemTable != null) {
           logger.info(
-              "{}: flush a working memtable in async close tsfile {}, memtable 
size: {}, tsfile "
-                  + "size: {}, plan index: [{}, {}], progress index: {}",
+              "{}: flush working memtable {} with size: {}, tsfile "
+                  + "size: {}, plan index: [{}, {}], progress index: {} in 
async close tsfile {}",
               storageGroupName,
-              tsFileResource.getTsFile().getAbsolutePath(),
+              workMemTable.getMemTableId(),
               workMemTable.memSize(),
               tsFileResource.getTsFileSize(),
               workMemTable.getMinPlanIndex(),
               workMemTable.getMaxPlanIndex(),
-              tsFileResource.getMaxProgressIndex());
+              tsFileResource.getMaxProgressIndex(),
+              tsFileResource.getTsFile().getAbsolutePath());
         } else {
           logger.info(
               "{}: flush a NotifyFlushMemTable in async close tsfile {}, 
tsfile size: {}",
@@ -900,7 +943,12 @@ public class TsFileProcessor {
         // When invoke closing TsFile after insert data to memTable, we 
shouldn't flush until invoke
         // flushing memTable in System module.
         addAMemtableIntoFlushingList(tmpMemTable);
-        logger.info("Memtable {} has been added to flushing list", 
tmpMemTable);
+        logger.info(
+            "Memtable {}: {} with size {} and points {} has been added to 
flushing list",
+            tmpMemTable.getMemTableId(),
+            tmpMemTable,
+            tmpMemTable.memSize(),
+            tmpMemTable.getTotalPointsNum());
         shouldClose = true;
       } catch (Exception e) {
         logger.error(
@@ -982,7 +1030,9 @@ public class TsFileProcessor {
         return;
       }
       logger.info(
-          "Async flush a memtable to tsfile: {}", 
tsFileResource.getTsFile().getAbsolutePath());
+          "Async flush a memtable {} to tsfile: {}",
+          workMemTable.getMemTableId(),
+          tsFileResource.getTsFile().getAbsolutePath());
       addAMemtableIntoFlushingList(workMemTable);
     } catch (Exception e) {
       logger.error(
@@ -1007,7 +1057,9 @@ public class TsFileProcessor {
   private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws 
IOException {
     Map<String, Long> lastTimeForEachDevice = new HashMap<>();
     if (sequence) {
-      lastTimeForEachDevice = tobeFlushed.getMaxTime();
+      // TODO:修改lastFlushTime的取法
+      lastTimeForEachDevice = tobeFlushed.getTopKTime();
+      // lastTimeForEachDevice = tobeFlushed.getMaxTime();
       // If some devices have been removed in MemTable, the number of device 
in MemTable and
       // tsFileResource will not be the same. And the endTime of these devices 
in resource will be
       // Long.minValue.
@@ -1029,7 +1081,19 @@ public class TsFileProcessor {
     if (enableMemControl) {
       
SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost());
     }
-    flushingMemTables.addLast(tobeFlushed);
+    // TODO:如果是顺序区并且k>0,拆分之后分别进入不同的waitinglist中,否则的话仍然按照原方式刷盘
+    if (!sequence || MEMTABLE_TOPK_SIZE == 0) {
+      flushingMemTables.addLast(tobeFlushed);
+      workMemTable = null;
+    } else {
+      try {
+        createNewWorkingMemTable(tobeFlushed);
+      } catch (WriteProcessException e) {
+        throw new RuntimeException(e);
+      }
+      flushingMemTables.addLast(tobeFlushed);
+    }
+
     if (logger.isDebugEnabled()) {
       logger.debug(
           "{}: {} Memtable (signal = {}) is added into the flushing Memtable, 
queue size = {}",
@@ -1042,7 +1106,6 @@ public class TsFileProcessor {
     if (!(tobeFlushed.isSignalMemTable() || tobeFlushed.isEmpty())) {
       totalMemTableSize += tobeFlushed.memSize();
     }
-    workMemTable = null;
     FlushManager.getInstance().registerTsFileProcessor(this);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 21ba12505b7..d954fe35bc4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -49,7 +50,13 @@ public class WritableMemChunk implements IWritableMemChunk {
     this.list = TVList.newList(schema.getType());
   }
 
-  private WritableMemChunk() {}
+  public WritableMemChunk(IMeasurementSchema schema, TVList ls) {
+    this.schema = schema;
+    this.list = ls;
+  }
+
+  // private WritableMemChunk() {}
+  public WritableMemChunk() {}
 
   @Override
   public boolean writeWithFlushCheck(long insertTime, Object objectValue) {
@@ -245,11 +252,21 @@ public class WritableMemChunk implements 
IWritableMemChunk {
     return schema;
   }
 
+  @Override
+  public void setSchema(IMeasurementSchema s) {
+    this.schema = s;
+  }
+
   @Override
   public long getMaxTime() {
     return list.getMaxTime();
   }
 
+  @Override
+  public long getTopKTime() {
+    return list.getTopKTime();
+  }
+
   @Override
   public long getFirstPoint() {
     if (list.rowCount() == 0) {
@@ -268,6 +285,11 @@ public class WritableMemChunk implements IWritableMemChunk 
{
         .getTimestamp();
   }
 
+  @Override
+  public IWritableMemChunk divide() throws TopkDivideMemoryNotEnoughException {
+    return new WritableMemChunk(this.schema, list.divide());
+  }
+
   @Override
   public boolean isEmpty() {
     return list.rowCount() == 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
index 707eace7fa5..dbe03f14c02 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternUtil;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
+import 
org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -172,6 +173,25 @@ public class WritableMemChunkGroup implements 
IWritableMemChunkGroup {
     return maxTime;
   }
 
+  @Override
+  public long getTopKTime() {
+    long maxTime = Long.MIN_VALUE;
+    for (IWritableMemChunk memChunk : memChunkMap.values()) {
+      maxTime = Math.max(maxTime, memChunk.getTopKTime());
+    }
+    return maxTime;
+  }
+
+  @Override
+  public IWritableMemChunkGroup divide() throws 
TopkDivideMemoryNotEnoughException {
+    WritableMemChunkGroup topkMemChunkGroup = new WritableMemChunkGroup();
+    for (String key : memChunkMap.keySet()) {
+      IWritableMemChunk topkMemChunk = memChunkMap.get(key).divide();
+      topkMemChunkGroup.memChunkMap.put(key, topkMemChunk);
+    }
+    return topkMemChunkGroup;
+  }
+
   @Override
   public int serializedSize() {
     int size = 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
index 33264fc46c1..4f1edd83ca1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
@@ -40,6 +40,7 @@ public class PrimitiveArrayManager {
   private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
   public static final int ARRAY_SIZE = CONFIG.getPrimitiveArraySize();
+  public static final int MEMTABLE_TOPK_SIZE = CONFIG.getSeqMemtableTopKSize();
 
   public static final TVListSortAlgorithm TVLIST_SORT_ALGORITHM = 
CONFIG.getTvListSortAlgorithm();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 95bbd231ab9..b84aec8ddd0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -43,8 +43,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
-import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM;
+import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.*;
 import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
 import static 
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
 import static 
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
@@ -643,6 +642,37 @@ public abstract class AlignedTVList extends TVList {
     }
   }
 
+  @Override
+  public TVList divide() throws TopkDivideMemoryNotEnoughException {
+    if (MEMTABLE_TOPK_SIZE > rowCount || MEMTABLE_TOPK_SIZE < ARRAY_SIZE) {
+      throw new TopkDivideMemoryNotEnoughException(
+          String.format(
+              "WARNING: MEMTABLE_TOPK_SIZE is bigger than%d "
+                  + "the TVList's row count  %d"
+                  + "or is smaller than ARRAY_SIZE %d",
+              rowCount, MEMTABLE_TOPK_SIZE, ARRAY_SIZE));
+    }
+    AlignedTVList topkTVList = AlignedTVList.newAlignedList(dataTypes);
+    int truncatedIndex = (rowCount - MEMTABLE_TOPK_SIZE + ARRAY_SIZE - 1) / 
ARRAY_SIZE;
+
+    for (int i = truncatedIndex + 1; i <= rowCount / ARRAY_SIZE; i++) {
+      topkTVList.timestamps.add(timestamps.get(i));
+      for (int colIndex = 0; colIndex < values.size(); colIndex++) {
+        topkTVList.values.get(colIndex).add(values.get(colIndex).get(i));
+        topkTVList.bitMaps.get(colIndex).add(bitMaps.get(colIndex).get(i));
+      }
+    }
+    for (int i = rowCount / ARRAY_SIZE; i >= truncatedIndex + 1; i--) {
+      timestamps.remove(timestamps.size() - 1);
+      for (int colIndex = 0; colIndex < values.size(); colIndex++) {
+        topkTVList.values.get(colIndex).remove(values.get(colIndex).size() - 
1);
+        topkTVList.bitMaps.get(colIndex).remove(bitMaps.get(colIndex).size() - 
1);
+      }
+    }
+    rowCount = truncatedIndex * ARRAY_SIZE;
+    topKTime = timestamps.get(truncatedIndex - 1)[ARRAY_SIZE - 1];
+    return topkTVList;
+  }
   /**
    * Get the row index value in index column.
    *
@@ -745,6 +775,8 @@ public abstract class AlignedTVList extends TVList {
         checkExpansion();
       }
     }
+    sort(Math.max(0, start - MEMTABLE_TOPK_SIZE), end);
+    topKTime = Math.max(topKTime, getTime(Math.max(0, end - 
MEMTABLE_TOPK_SIZE)));
   }
 
   private void arrayCopy(Object[] value, int idx, int arrayIndex, int 
elementIndex, int remaining) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java
index 6271a31734e..21783bc0370 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java
@@ -53,7 +53,7 @@ public interface BackwardSort extends QuickSort {
 
   /**
    * check block-inversions to find the proper block_size, which is a multiple 
of array_size. For
-   * totally ordered, the block_size will equals to array_size For totally 
reverse ordered, the
+   * totally ordered, the block_size will equal to array_size For totally 
reverse ordered, the
    * block_size will equals to the rowCount. INVERSION_RATIOS_THRESHOLD=0.005 
is a empiric value.
    *
    * @param timestamps
@@ -132,6 +132,6 @@ public interface BackwardSort extends QuickSort {
    * @param hi
    */
   default void sortBlock(int lo, int hi) {
-    qsort(lo, hi);
+    qSort(lo, hi);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index c64dbc94fd8..1865efe6a9f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -34,8 +34,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
-import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM;
+import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.*;
 
 public abstract class IntTVList extends TVList {
   // list of primitive array, add 1 when expanded -> int primitive array
@@ -77,14 +76,59 @@ public abstract class IntTVList extends TVList {
   @Override
   public void putInt(long timestamp, int value) {
     checkExpansion();
-    int arrayIndex = rowCount / ARRAY_SIZE;
-    int elementIndex = rowCount % ARRAY_SIZE;
-    maxTime = Math.max(maxTime, timestamp);
-    timestamps.get(arrayIndex)[elementIndex] = timestamp;
-    values.get(arrayIndex)[elementIndex] = value;
-    rowCount++;
-    if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) {
-      sorted = false;
+    if (MEMTABLE_TOPK_SIZE == 0) {
+      int arrayIndex = rowCount / ARRAY_SIZE;
+      int elementIndex = rowCount % ARRAY_SIZE;
+      maxTime = Math.max(maxTime, timestamp);
+      topKTime = Math.max(topKTime, timestamp);
+      timestamps.get(arrayIndex)[elementIndex] = timestamp;
+      values.get(arrayIndex)[elementIndex] = value;
+      rowCount++;
+      if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) {
+        sorted = false;
+      }
+    } else {
+      int arrayIndex;
+      int elementIndex;
+      int waitlen = Math.min(rowCount, MEMTABLE_TOPK_SIZE);
+      int tempRowCount = rowCount - 1; // 记录向前比较的截止位置
+      while (waitlen > 0) {
+        waitlen--;
+        arrayIndex = tempRowCount / ARRAY_SIZE;
+        elementIndex = tempRowCount % ARRAY_SIZE;
+        if (timestamps.get(arrayIndex)[elementIndex] > timestamp) {
+          tempRowCount--;
+          continue;
+        }
+        break;
+      }
+      arrayIndex = rowCount / ARRAY_SIZE;
+      elementIndex = rowCount % ARRAY_SIZE;
+      int arrayIndexLeft = (rowCount - 1) / ARRAY_SIZE;
+      int elementIndexLeft = (rowCount - 1) % ARRAY_SIZE;
+      for (int i = rowCount - 1; i > tempRowCount; i--) {
+        timestamps.get(arrayIndex)[elementIndex] = 
timestamps.get(arrayIndexLeft)[elementIndexLeft];
+        values.get(arrayIndex)[elementIndex] = 
values.get(arrayIndexLeft)[elementIndexLeft];
+        arrayIndex = arrayIndexLeft;
+        elementIndex = elementIndexLeft;
+        arrayIndexLeft = (i - 1) / ARRAY_SIZE;
+        elementIndexLeft = (i - 1) % ARRAY_SIZE;
+        sortCount++;
+      }
+      arrayIndex = (tempRowCount + 1) / ARRAY_SIZE;
+      elementIndex = (tempRowCount + 1) % ARRAY_SIZE;
+      maxTime = Math.max(maxTime, timestamp);
+      timestamps.get(arrayIndex)[elementIndex] = timestamp;
+      values.get(arrayIndex)[elementIndex] = value;
+      if (rowCount > MEMTABLE_TOPK_SIZE) {
+        arrayIndex = (rowCount - MEMTABLE_TOPK_SIZE - 1) / ARRAY_SIZE;
+        elementIndex = (rowCount - MEMTABLE_TOPK_SIZE - 1) % ARRAY_SIZE;
+        topKTime = Math.max(topKTime, 
timestamps.get(arrayIndex)[elementIndex]);
+      }
+      rowCount++;
+      if (sorted && rowCount > 1 && waitlen == 0) {
+        sorted = false;
+      }
     }
   }
 
@@ -123,6 +167,37 @@ public abstract class IntTVList extends TVList {
     values.add((int[]) getPrimitiveArraysByType(TSDataType.INT32));
   }
 
+  @Override
+  public TVList divide() throws TopkDivideMemoryNotEnoughException {
+    if (MEMTABLE_TOPK_SIZE > rowCount || MEMTABLE_TOPK_SIZE < ARRAY_SIZE) {
+      throw new TopkDivideMemoryNotEnoughException(
+          String.format(
+              "WARNING: MEMTABLE_TOPK_SIZE %d is bigger than "
+                  + "the TVList's row count %d, "
+                  + "or is smaller than ARRAY_SIZE %d",
+              MEMTABLE_TOPK_SIZE, rowCount, ARRAY_SIZE));
+    }
+    IntTVList topkTVList = IntTVList.newList();
+    int truncatedIndex = rowCount - MEMTABLE_TOPK_SIZE;
+    int truncatedArrayIndex =
+        truncatedIndex / ARRAY_SIZE; // no matter truncatedIndex in or not in 
the block
+    truncatedIndex = truncatedArrayIndex * ARRAY_SIZE;
+    for (int i = truncatedArrayIndex; i < timestamps.size(); i++) {
+      topkTVList.timestamps.add(timestamps.get(i));
+      topkTVList.values.add(values.get(i));
+    }
+    for (int i = timestamps.size() - 1; i >= truncatedArrayIndex; i--) {
+      timestamps.remove(timestamps.size() - 1);
+      values.remove(values.size() - 1);
+    }
+    topkTVList.rowCount = rowCount - truncatedIndex;
+    topkTVList.sorted = true;
+    topkTVList.topKTime = topkTVList.getTime(0);
+    rowCount = truncatedIndex;
+    topKTime = topkTVList.getTopKTime();
+    return topkTVList;
+  }
+
   @Override
   public TimeValuePair getTimeValuePair(int index) {
     return new TimeValuePair(
@@ -201,6 +276,11 @@ public abstract class IntTVList extends TVList {
         checkExpansion();
       }
     }
+    if (MEMTABLE_TOPK_SIZE != 0) {
+      sort(Math.max(0, start - MEMTABLE_TOPK_SIZE), rowCount);
+      int index = Math.max(0, rowCount - MEMTABLE_TOPK_SIZE - 1);
+      topKTime = Math.max(topKTime, getTime(index));
+    }
   }
 
   // move null values to the end of time array and value array, then return 
number of null values
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java
index 5e6a9dec2dc..d60067af576 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java
@@ -30,11 +30,16 @@ public class QuickAlignedTVList extends AlignedTVList 
implements QuickSort {
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
+  @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
   @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java
index dcdcbc2cb59..ee4e6f9cbc8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java
@@ -42,11 +42,16 @@ public class QuickBinaryTVList extends BinaryTVList 
implements QuickSort {
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
+  @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
   @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java
index e4fbfaefca2..c7b43470551 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java
@@ -39,11 +39,16 @@ public class QuickBooleanTVList extends BooleanTVList 
implements QuickSort {
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
+  @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
   @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java
index 852844f9231..584c8f8f789 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java
@@ -39,11 +39,16 @@ public class QuickDoubleTVList extends DoubleTVList 
implements QuickSort {
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
+  @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
   @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java
index 409a00a093e..90485257276 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java
@@ -39,11 +39,16 @@ public class QuickFloatTVList extends FloatTVList 
implements QuickSort {
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
+  @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
   @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java
index bc44cf79053..d24842babac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java
@@ -22,11 +22,16 @@ public class QuickIntTVList extends IntTVList implements 
QuickSort {
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
+  @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
   @Override
   public void swap(int p, int q) {
     int valueP = getInt(p);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java
index 16d8fab3606..3d4f1ab1a7c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java
@@ -39,11 +39,16 @@ public class QuickLongTVList extends LongTVList implements 
QuickSort {
   @Override
   public void sort() {
     if (!sorted) {
-      qsort(0, rowCount - 1);
+      qSort(0, rowCount - 1);
     }
     sorted = true;
   }
 
+  @Override
+  public void sort(int lo, int hi) {
+    qSort(lo, hi);
+  }
+
   @Override
   protected void set(int src, int dest) {
     long srcT = getTime(src);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java
index 8229781aa05..9bad94d4f9b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java
@@ -58,7 +58,7 @@ public interface QuickSort {
   //        }
   //    }
 
-  default void qsort(int lo, int hi) {
+  default void qSort(int lo, int hi) {
     if (lo < hi) {
       // TODO: use insertion sort in smaller array
       // if(hi - lo <= 32) {
@@ -66,8 +66,8 @@ public interface QuickSort {
       // }
       // partition
       int pivotIndex = partition(lo, hi);
-      qsort(lo, pivotIndex - 1);
-      qsort(pivotIndex + 1, hi);
+      qSort(lo, pivotIndex - 1);
+      qSort(pivotIndex + 1, hi);
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 246102bb7eb..788af81cf89 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -42,6 +42,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
+import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.MEMTABLE_TOPK_SIZE;
 import static 
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
 import static 
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
 
@@ -58,6 +59,8 @@ public abstract class TVList implements WALEntryValue {
 
   protected boolean sorted = true;
   protected long maxTime;
+  protected long topKTime;
+  protected int sortCount;
   // record reference count of this tv list
   // currently this reference will only be increase because we can't know when 
to decrease it
   protected AtomicInteger referenceCount;
@@ -66,7 +69,9 @@ public abstract class TVList implements WALEntryValue {
   protected TVList() {
     timestamps = new ArrayList<>();
     rowCount = 0;
+    sortCount = 0;
     maxTime = Long.MIN_VALUE;
+    topKTime = Long.MIN_VALUE;
     referenceCount = new AtomicInteger();
   }
 
@@ -109,6 +114,8 @@ public abstract class TVList implements WALEntryValue {
 
   public abstract void sort();
 
+  public abstract void sort(int lo, int hi);
+
   public void increaseReferenceCount() {
     referenceCount.incrementAndGet();
   }
@@ -214,6 +221,10 @@ public abstract class TVList implements WALEntryValue {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
 
+  public TVList divide() throws TopkDivideMemoryNotEnoughException {
+    throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
+  }
+
   public Object getAlignedValue(int index) {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
@@ -231,6 +242,10 @@ public abstract class TVList implements WALEntryValue {
     return maxTime;
   }
 
+  public long getTopKTime() {
+    return topKTime;
+  }
+
   public long getVersion() {
     return version;
   }
@@ -330,6 +345,7 @@ public abstract class TVList implements WALEntryValue {
     for (int i = start; i < end; i++) {
       inPutMinTime = Math.min(inPutMinTime, time[i]);
       maxTime = Math.max(maxTime, time[i]);
+      topKTime = Math.max(topKTime, time[Math.max(0, i - MEMTABLE_TOPK_SIZE)]);
       if (inputSorted && i < length - 1 && time[i] > time[i + 1]) {
         inputSorted = false;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java
index 7c6fa7f92ef..819b00e3c93 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java
@@ -50,7 +50,7 @@ public class TimAlignedTVList extends AlignedTVList 
implements TimSort {
           (int[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, rowCount);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      this.timSort(0, rowCount);
     }
     clearSortedValue();
     clearSortedTime();
@@ -58,7 +58,12 @@ public class TimAlignedTVList extends AlignedTVList 
implements TimSort {
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java
index 03760fd14c8..fccb64a1b2b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java
@@ -44,14 +44,19 @@ public class TimBinaryTVList extends BinaryTVList 
implements TimSort {
       sortedValues =
           (Binary[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.TEXT, rowCount);
     }
-    sort(0, rowCount);
+    timSort(0, rowCount);
     clearSortedValue();
     clearSortedTime();
     sorted = true;
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java
index 22b56475905..04d43f30bb9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java
@@ -43,7 +43,7 @@ public class TimBooleanTVList extends BooleanTVList 
implements TimSort {
           (boolean[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.BOOLEAN, rowCount);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      timSort(0, rowCount);
     }
     clearSortedValue();
     clearSortedTime();
@@ -51,7 +51,12 @@ public class TimBooleanTVList extends BooleanTVList 
implements TimSort {
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java
index 3c0aa823798..543a4e2ec29 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java
@@ -43,7 +43,7 @@ public class TimDoubleTVList extends DoubleTVList implements 
TimSort {
           (double[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.DOUBLE, rowCount);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      timSort(0, rowCount);
     }
     clearSortedValue();
     clearSortedTime();
@@ -51,7 +51,12 @@ public class TimDoubleTVList extends DoubleTVList implements 
TimSort {
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java
index 2b1fb440a95..d360959a81e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java
@@ -44,7 +44,7 @@ public class TimFloatTVList extends FloatTVList implements 
TimSort {
           (float[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.FLOAT, rowCount);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      timSort(0, rowCount);
     }
     clearSortedValue();
     clearSortedTime();
@@ -52,7 +52,12 @@ public class TimFloatTVList extends FloatTVList implements 
TimSort {
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java
index b3ae939892d..d2f153e05a1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java
@@ -33,18 +33,22 @@ public class TimIntTVList extends IntTVList implements 
TimSort {
 
   @Override
   public void sort() {
+    sort(0, rowCount);
+  }
+
+  @Override
+  public void sort(int lo, int hi) {
+    int len = hi - lo;
     if (sortedTimestamps == null
-        || sortedTimestamps.length < 
PrimitiveArrayManager.getArrayRowCount(rowCount)) {
+        || sortedTimestamps.length < 
PrimitiveArrayManager.getArrayRowCount(len)) {
       sortedTimestamps =
-          (long[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, rowCount);
+          (long[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, len);
     }
-    if (sortedValues == null
-        || sortedValues.length < 
PrimitiveArrayManager.getArrayRowCount(rowCount)) {
-      sortedValues =
-          (int[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, rowCount);
+    if (sortedValues == null || sortedValues.length < 
PrimitiveArrayManager.getArrayRowCount(len)) {
+      sortedValues = (int[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, len);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      timSort(0, len);
     }
     clearSortedValue();
     clearSortedTime();
@@ -52,7 +56,7 @@ public class TimIntTVList extends IntTVList implements 
TimSort {
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java
index e3a02e41db5..9096cd47e1b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java
@@ -43,7 +43,7 @@ public class TimLongTVList extends LongTVList implements 
TimSort {
           (long[][]) 
PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, rowCount);
     }
     if (!sorted) {
-      sort(0, rowCount);
+      timSort(0, rowCount);
     }
     clearSortedValue();
     clearSortedTime();
@@ -51,7 +51,12 @@ public class TimLongTVList extends LongTVList implements 
TimSort {
   }
 
   @Override
-  public void tim_set(int src, int dest) {
+  public void sort(int lo, int hi) {
+    timSort(lo, hi);
+  }
+
+  @Override
+  public void timSet(int src, int dest) {
     set(src, dest);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java
index 63584429d4a..0426fdafe66 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java
@@ -27,7 +27,7 @@ public interface TimSort {
   int SMALL_ARRAY_LENGTH = 32;
 
   /** the same as the 'set' function in TVList, the reason is to avoid two 
equal functions. */
-  void tim_set(int src, int dest);
+  void timSet(int src, int dest);
 
   void setFromSorted(int src, int dest);
 
@@ -55,7 +55,7 @@ public interface TimSort {
    * the entrance of tim_sort; 1. array_size <= 32, use binary sort. 2. 
recursively invoke merge
    * sort.
    */
-  default void sort(int lo, int hi) {
+  default void timSort(int lo, int hi) {
     if (lo == hi) {
       return;
     }
@@ -65,8 +65,8 @@ public interface TimSort {
       return;
     }
     int mid = (lo + hi) >>> 1;
-    sort(lo, mid);
-    sort(mid, hi);
+    timSort(lo, mid);
+    timSort(mid, hi);
     merge(lo, mid, hi);
   }
 
@@ -127,7 +127,7 @@ public interface TimSort {
        */
       int n = start - left; // The number of elements to move
       for (int i = n; i >= 1; i--) {
-        tim_set(left + i - 1, left + i);
+        timSet(left + i - 1, left + i);
       }
       setPivotTo(left);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TopkDivideMemoryNotEnoughException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TopkDivideMemoryNotEnoughException.java
new file mode 100644
index 00000000000..e1d8d727ec6
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TopkDivideMemoryNotEnoughException.java
@@ -0,0 +1,8 @@
+package org.apache.iotdb.db.utils.datastructure;
+
+public class TopkDivideMemoryNotEnoughException extends Exception {
+
+  public TopkDivideMemoryNotEnoughException(String message) {
+    super(message);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 277e7c3d17c..6e971ec724c 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -516,6 +516,11 @@ cluster_name=defaultCluster
 # BACKWARD: backward sort
 # tvlist_sort_algorithm=TIM
 
+# The number of points stay in the next tvlist
+# The default waiting size is 0
+# Datatype: int
+# seq_memtable_topk_size=0
+
 # When the average point number of timeseries in memtable exceeds this, the 
memtable is flushed to disk. The default threshold is 100000.
 # Datatype: int
 # avg_series_point_number_threshold=100000

Reply via email to