This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ef8dc52e89f perf: add encodeBatch interface to accelerate flushing for 
sequential insert (#15243)
ef8dc52e89f is described below

commit ef8dc52e89f8fa074c9beb5c814b8780aa972ef0
Author: shizy <[email protected]>
AuthorDate: Tue Apr 8 10:26:48 2025 +0800

    perf: add encodeBatch interface to accelerate flushing for sequential 
insert (#15243)
    
    * perf: batch encode to improve insert performance
    
    * fix IoTDBSimpleQueryIT
    
    * move batchEncodeInfo / times as MemTableFlushTask member
---
 .../dataregion/flush/MemTableFlushTask.java        |  15 +-
 .../memtable/AbstractWritableMemChunk.java         |   4 +-
 .../memtable/AlignedWritableMemChunk.java          | 114 ++++-----------
 .../dataregion/memtable/IWritableMemChunk.java     |   3 +-
 .../dataregion/memtable/TsFileProcessor.java       |  59 ++++----
 .../dataregion/memtable/WritableMemChunk.java      |  93 +++----------
 .../db/utils/datastructure/AlignedTVList.java      | 154 ++++++++++++++++++++-
 ...{MemPointIterator.java => BatchEncodeInfo.java} |  27 +++-
 .../db/utils/datastructure/MemPointIterator.java   |   3 +
 .../MergeSortMultiAlignedTVListIterator.java       | 105 ++++++++++++++
 .../MergeSortMultiTVListIterator.java              |  77 ++++++++++-
 .../utils/datastructure/MultiTVListIterator.java   |  11 +-
 .../OrderedMultiAlignedTVListIterator.java         |  15 ++
 .../datastructure/OrderedMultiTVListIterator.java  |  31 ++++-
 .../iotdb/db/utils/datastructure/TVList.java       | 119 +++++++++++++---
 15 files changed, 595 insertions(+), 235 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index a14bc85e17a..17fce5dd197 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -27,12 +27,15 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import 
org.apache.iotdb.db.storageengine.dataregion.flush.pool.FlushSubTaskPoolManager;
+import 
org.apache.iotdb.db.storageengine.dataregion.memtable.AlignedWritableMemChunk;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
 import 
org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 
+import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.write.chunk.IChunkWriter;
 import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -62,6 +65,9 @@ public class MemTableFlushTask {
       FlushSubTaskPoolManager.getInstance();
   private static final WritingMetrics WRITING_METRICS = 
WritingMetrics.getInstance();
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
+      TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+
   /* storage group name -> last time */
   private static final Map<String, Long> flushPointsCache = new 
ConcurrentHashMap<>();
   private final Future<?> encodingTaskFuture;
@@ -82,6 +88,9 @@ public class MemTableFlushTask {
   private volatile long memSerializeTime = 0L;
   private volatile long ioTime = 0L;
 
+  private final BatchEncodeInfo encodeInfo;
+  private long[] times;
+
   /**
    * @param memTable the memTable to flush
    * @param writer the writer where memTable will be flushed to (current 
tsfile writer or vm writer)
@@ -98,6 +107,7 @@ public class MemTableFlushTask {
     this.dataRegionId = dataRegionId;
     this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
     this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
+    this.encodeInfo = new BatchEncodeInfo(0, 0, 0);
     LOGGER.debug(
         "flush task of database {} memtable is created, flushing to file {}.",
         storageGroup,
@@ -248,7 +258,10 @@ public class MemTableFlushTask {
             } else {
               long starTime = System.currentTimeMillis();
               IWritableMemChunk writableMemChunk = (IWritableMemChunk) task;
-              writableMemChunk.encode(ioTaskQueue);
+              if (writableMemChunk instanceof AlignedWritableMemChunk && times 
== null) {
+                times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
+              }
+              writableMemChunk.encode(ioTaskQueue, encodeInfo, times);
               long subTaskTime = System.currentTimeMillis() - starTime;
               
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, 
subTaskTime);
               memSerializeTime += subTaskTime;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
index d4b4cfde383..ac71e3da6bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -192,7 +193,8 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
   public abstract IChunkWriter createIChunkWriter();
 
   @Override
-  public abstract void encode(BlockingQueue<Object> ioTaskQueue);
+  public abstract void encode(
+      BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] 
times);
 
   @Override
   public abstract void release();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index b6db03ec294..013b2d6109c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
+import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
 import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
 import org.apache.iotdb.db.utils.datastructure.MemPointIteratorFactory;
 import org.apache.iotdb.db.utils.datastructure.TVList;
@@ -32,14 +33,12 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.TimeRange;
-import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
 import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.tsfile.write.chunk.IChunkWriter;
-import org.apache.tsfile.write.chunk.ValueChunkWriter;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 
@@ -65,6 +64,7 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
   private final List<IMeasurementSchema> schemaList;
   private AlignedTVList list;
   private List<AlignedTVList> sortedList;
+  private long sortedRowCount = 0;
   private final boolean ignoreAllNullRows;
 
   private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
@@ -197,6 +197,7 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
       list.sort();
     }
     sortedList.add(list);
+    this.sortedRowCount += list.rowCount();
     this.list = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes));
     this.dataTypes = list.getTsDataTypes();
   }
@@ -352,15 +353,11 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
 
   @Override
   public long rowCount() {
-    return alignedListSize();
+    return sortedRowCount + list.rowCount();
   }
 
   public int alignedListSize() {
-    int rowCount = list.rowCount();
-    for (AlignedTVList alignedTvList : sortedList) {
-      rowCount += alignedTvList.rowCount();
-    }
-    return rowCount;
+    return (int) rowCount();
   }
 
   @Override
@@ -638,107 +635,56 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
   }
 
   @Override
-  public synchronized void encode(BlockingQueue<Object> ioTaskQueue) {
+  public synchronized void encode(
+      BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] 
times) {
     if (TVLIST_SORT_THRESHOLD == 0) {
       encodeWorkingAlignedTVList(ioTaskQueue);
       return;
     }
 
     AlignedChunkWriterImpl alignedChunkWriter = new 
AlignedChunkWriterImpl(schemaList);
+
     // create MergeSortAlignedTVListIterator.
     List<AlignedTVList> alignedTvLists = new ArrayList<>(sortedList);
     alignedTvLists.add(list);
+    List<Integer> columnIndexList = buildColumnIndexList(schemaList);
     MemPointIterator timeValuePairIterator =
-        MemPointIteratorFactory.create(dataTypes, null, alignedTvLists, 
ignoreAllNullRows);
-
-    int pointNumInPage = 0;
-    int pointNumInChunk = 0;
-    long[] times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
+        MemPointIteratorFactory.create(
+            dataTypes, columnIndexList, alignedTvLists, ignoreAllNullRows);
 
     while (timeValuePairIterator.hasNextBatch()) {
-      TsBlock tsBlock = timeValuePairIterator.nextBatch();
-      if (tsBlock == null) {
-        continue;
+      timeValuePairIterator.encodeBatch(alignedChunkWriter, encodeInfo, times);
+      if (encodeInfo.pointNumInPage >= MAX_NUMBER_OF_POINTS_IN_PAGE) {
+        alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
+        encodeInfo.pointNumInPage = 0;
       }
-      for (int rowIndex = 0; rowIndex < tsBlock.getPositionCount(); 
rowIndex++) {
-        long time = tsBlock.getTimeByIndex(rowIndex);
-        times[pointNumInPage] = time;
 
-        for (int columnIndex = 0; columnIndex < dataTypes.size(); 
columnIndex++) {
-          ValueChunkWriter valueChunkWriter =
-              alignedChunkWriter.getValueChunkWriterByIndex(columnIndex);
-          if (tsBlock.getColumn(columnIndex).isNull(rowIndex)) {
-            valueChunkWriter.write(time, null, true);
-            continue;
-          }
-          switch (schemaList.get(columnIndex).getType()) {
-            case BOOLEAN:
-              valueChunkWriter.write(
-                  time, tsBlock.getColumn(columnIndex).getBoolean(rowIndex), 
false);
-              break;
-            case INT32:
-            case DATE:
-              valueChunkWriter.write(time, 
tsBlock.getColumn(columnIndex).getInt(rowIndex), false);
-              break;
-            case INT64:
-            case TIMESTAMP:
-              valueChunkWriter.write(time, 
tsBlock.getColumn(columnIndex).getLong(rowIndex), false);
-              break;
-            case FLOAT:
-              valueChunkWriter.write(
-                  time, tsBlock.getColumn(columnIndex).getFloat(rowIndex), 
false);
-              break;
-            case DOUBLE:
-              valueChunkWriter.write(
-                  time, tsBlock.getColumn(columnIndex).getDouble(rowIndex), 
false);
-              break;
-            case TEXT:
-            case BLOB:
-            case STRING:
-              valueChunkWriter.write(
-                  time, tsBlock.getColumn(columnIndex).getBinary(rowIndex), 
false);
-              break;
-            default:
-              break;
-          }
-        }
-        pointNumInPage++;
-        pointNumInChunk++;
-
-        // new page
-        if (pointNumInPage == MAX_NUMBER_OF_POINTS_IN_PAGE
-            || pointNumInChunk >= maxNumberOfPointsInChunk) {
-          alignedChunkWriter.write(times, pointNumInPage, 0);
-          pointNumInPage = 0;
-        }
-
-        // new chunk
-        if (pointNumInChunk >= maxNumberOfPointsInChunk) {
-          alignedChunkWriter.sealCurrentPage();
-          alignedChunkWriter.clearPageWriter();
-          try {
-            ioTaskQueue.put(alignedChunkWriter);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-          }
-          alignedChunkWriter = new AlignedChunkWriterImpl(schemaList);
-          pointNumInChunk = 0;
+      if (encodeInfo.pointNumInChunk >= maxNumberOfPointsInChunk) {
+        alignedChunkWriter.sealCurrentPage();
+        alignedChunkWriter.clearPageWriter();
+        try {
+          ioTaskQueue.put(alignedChunkWriter);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
         }
+        alignedChunkWriter = new AlignedChunkWriterImpl(schemaList);
+        encodeInfo.reset();
       }
     }
 
     // last batch of points
-    if (pointNumInChunk > 0) {
-      if (pointNumInPage > 0) {
-        alignedChunkWriter.write(times, pointNumInPage, 0);
-        alignedChunkWriter.sealCurrentPage();
-        alignedChunkWriter.clearPageWriter();
+    if (encodeInfo.pointNumInChunk > 0) {
+      if (encodeInfo.pointNumInPage > 0) {
+        alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
       }
+      alignedChunkWriter.sealCurrentPage();
+      alignedChunkWriter.clearPageWriter();
       try {
         ioTaskQueue.put(alignedChunkWriter);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
+      encodeInfo.reset();
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 26abdf1d39c..13f09a05d8a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -111,7 +112,7 @@ public interface IWritableMemChunk extends WALEntryValue {
 
   IChunkWriter createIChunkWriter();
 
-  void encode(BlockingQueue<Object> ioTaskQueue);
+  void encode(BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, 
long[] times);
 
   void release();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index ecd6dc2be39..d56868671ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -652,17 +652,16 @@ public class TsFileProcessor {
       if (dataTypes[i] == null || measurements[i] == null) {
         continue;
       }
-      if (workMemTable.chunkNotExist(deviceId, measurements[i])) {
+      IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, 
measurements[i]);
+      if (memChunk == null) {
         // ChunkMetadataIncrement
         chunkMetadataIncrement += 
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
         memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
       } else {
         // here currentChunkPointNum >= 1
-        IWritableMemChunk memChunk = 
workMemTable.getWritableMemChunk(deviceId, measurements[i]);
-        long currentChunkPointNum = memChunk != null ? memChunk.rowCount() : 0;
+        long currentChunkPointNum = memChunk.rowCount();
         if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
-          memTableIncrement +=
-              memChunk != null ? 
memChunk.getWorkingTVList().tvListArrayMemCost() : 0;
+          memTableIncrement += 
memChunk.getWorkingTVList().tvListArrayMemCost();
         }
       }
       // TEXT data mem size
@@ -693,7 +692,8 @@ public class TsFileProcessor {
         if (dataTypes[i] == null || measurements[i] == null) {
           continue;
         }
-        if (workMemTable.chunkNotExist(deviceId, measurements[i])
+        IWritableMemChunk memChunk = 
workMemTable.getWritableMemChunk(deviceId, measurements[i]);
+        if (memChunk == null
             && (!increasingMemTableInfo.containsKey(deviceId)
                 || 
!increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
           // ChunkMetadataIncrement
@@ -704,7 +704,6 @@ public class TsFileProcessor {
               .putIfAbsent(measurements[i], 1);
         } else {
           // here currentChunkPointNum >= 1
-          IWritableMemChunk memChunk = 
workMemTable.getWritableMemChunk(deviceId, measurements[i]);
           int addingPointNum =
               increasingMemTableInfo
                   .computeIfAbsent(deviceId, k -> new HashMap<>())
@@ -741,7 +740,9 @@ public class TsFileProcessor {
     long textDataIncrement = 0L;
     long chunkMetadataIncrement = 0L;
 
-    if (workMemTable.chunkNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) {
+    IWritableMemChunk memChunk =
+        workMemTable.getWritableMemChunk(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER);
+    if (memChunk == null) {
       // For new device of this mem table
       // ChunkMetadataIncrement
       chunkMetadataIncrement +=
@@ -750,9 +751,7 @@ public class TsFileProcessor {
       memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes, 
columnCategories);
     } else {
       // For existed device of this mem table
-      AlignedWritableMemChunk alignedMemChunk =
-          ((AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId))
-              .getAlignedMemChunk();
+      AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) 
memChunk;
       List<TSDataType> dataTypesInTVList = new ArrayList<>();
       for (int i = 0; i < dataTypes.length; i++) {
         // Skip failed Measurements
@@ -775,8 +774,7 @@ public class TsFileProcessor {
       }
       // this insertion will result in a new array
       if ((alignedMemChunk.alignedListSize() % 
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
-        dataTypesInTVList.addAll(
-            ((AlignedTVList) 
alignedMemChunk.getWorkingTVList()).getTsDataTypes());
+        
dataTypesInTVList.addAll(alignedMemChunk.getWorkingTVList().getTsDataTypes());
         memTableIncrement += 
alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost();
       }
     }
@@ -805,8 +803,10 @@ public class TsFileProcessor {
       TSDataType[] dataTypes = insertRowNode.getDataTypes();
       Object[] values = insertRowNode.getValues();
       String[] measurements = insertRowNode.getMeasurements();
-      if (workMemTable.chunkNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)
-          && !increasingMemTableInfo.containsKey(deviceId)) {
+
+      IWritableMemChunk memChunk =
+          workMemTable.getWritableMemChunk(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER);
+      if (memChunk == null && !increasingMemTableInfo.containsKey(deviceId)) {
         // For new device of this mem table
         // ChunkMetadataIncrement
         chunkMetadataIncrement +=
@@ -829,10 +829,7 @@ public class TsFileProcessor {
 
       } else {
         // For existed device of this mem table
-        AlignedWritableMemChunkGroup memChunkGroup =
-            (AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId);
-        AlignedWritableMemChunk alignedMemChunk =
-            memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
+        AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) 
memChunk;
         int currentChunkPointNum = alignedMemChunk == null ? 0 : 
alignedMemChunk.alignedListSize();
         List<TSDataType> dataTypesInTVList = new ArrayList<>();
         Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
@@ -867,8 +864,7 @@ public class TsFileProcessor {
         // Here currentChunkPointNum + addingPointNum >= 1
         if (((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
           if (alignedMemChunk != null) {
-            dataTypesInTVList.addAll(
-                ((AlignedTVList) 
alignedMemChunk.getWorkingTVList()).getTsDataTypes());
+            
dataTypesInTVList.addAll(alignedMemChunk.getWorkingTVList().getTsDataTypes());
           }
           dataTypesInTVList.addAll(addingPointNumInfo.left.values());
           memTableIncrement +=
@@ -969,21 +965,19 @@ public class TsFileProcessor {
       Object column) {
     // memIncrements = [memTable, text, chunk metadata] respectively
 
-    if (workMemTable.chunkNotExist(deviceId, measurement)) {
+    IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, 
measurement);
+    if (memChunk == null) {
       // ChunkMetadataIncrement
       memIncrements[2] += ChunkMetadata.calculateRamSize(measurement, 
dataType);
       memIncrements[0] +=
           ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
               * TVList.tvListArrayMemCost(dataType);
     } else {
-      IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, 
measurement);
-      long currentChunkPointNum = memChunk != null ? memChunk.rowCount() : 0;
+      long currentChunkPointNum = memChunk.rowCount();
       if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
         memIncrements[0] +=
             ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
-                * (memChunk != null
-                    ? memChunk.getWorkingTVList().tvListArrayMemCost()
-                    : TVList.tvListArrayMemCost(dataType));
+                * memChunk.getWorkingTVList().tvListArrayMemCost();
       } else {
         long acquireArray =
             (end - start - 1 + (currentChunkPointNum % 
PrimitiveArrayManager.ARRAY_SIZE))
@@ -1035,7 +1029,9 @@ public class TsFileProcessor {
     }
 
     // memIncrements = [memTable, text, chunk metadata] respectively
-    if (workMemTable.chunkNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) {
+    IWritableMemChunk memChunk =
+        workMemTable.getWritableMemChunk(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER);
+    if (memChunk == null) {
       // new devices introduce new ChunkMetadata
       // ChunkMetadata memory Increment
       memIncrements[2] +=
@@ -1049,9 +1045,7 @@ public class TsFileProcessor {
       memIncrements[0] +=
           numArraysToAdd * AlignedTVList.alignedTvListArrayMemCost(dataTypes, 
columnCategories);
     } else {
-      AlignedWritableMemChunk alignedMemChunk =
-          ((AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId))
-              .getAlignedMemChunk();
+      AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) 
memChunk;
       List<TSDataType> dataTypesInTVList = new ArrayList<>();
       int currentPointNum = alignedMemChunk.alignedListSize();
       int newPointNum = currentPointNum + incomingPointNum;
@@ -1086,8 +1080,7 @@ public class TsFileProcessor {
 
       if (acquireArray != 0) {
         // memory of extending the TVList
-        dataTypesInTVList.addAll(
-            ((AlignedTVList) 
alignedMemChunk.getWorkingTVList()).getTsDataTypes());
+        
dataTypesInTVList.addAll(alignedMemChunk.getWorkingTVList().getTsDataTypes());
         memIncrements[0] +=
             acquireArray * 
alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost();
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 5c8a41621ff..86bbbde0bdd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.utils.ModificationUtils;
+import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
 import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
 import org.apache.iotdb.db.utils.datastructure.MemPointIteratorFactory;
 import org.apache.iotdb.db.utils.datastructure.TVList;
@@ -31,7 +32,6 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.read.common.TimeRange;
-import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -57,6 +57,7 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
   private IMeasurementSchema schema;
   private TVList list;
   private List<TVList> sortedList;
+  private long sortedRowCount = 0;
   private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WritableMemChunk.class);
@@ -79,6 +80,7 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
       list.sort();
     }
     sortedList.add(list);
+    this.sortedRowCount += list.rowCount();
     this.list = TVList.newList(schema.getType());
   }
 
@@ -272,11 +274,7 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
 
   @Override
   public long rowCount() {
-    long rowCount = list.rowCount();
-    for (TVList tvList : sortedList) {
-      rowCount += tvList.rowCount();
-    }
-    return rowCount;
+    return sortedRowCount + list.rowCount();
   }
 
   @Override
@@ -458,16 +456,17 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
   }
 
   @Override
-  public synchronized void encode(BlockingQueue<Object> ioTaskQueue) {
+  public synchronized void encode(
+      BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] 
times) {
     if (TVLIST_SORT_THRESHOLD == 0) {
       encodeWorkingTVList(ioTaskQueue);
       return;
     }
 
-    TSDataType tsDataType = schema.getType();
     ChunkWriterImpl chunkWriterImpl = createIChunkWriter();
-    long dataSizeInCurrentChunk = 0;
-    int pointNumInCurrentChunk = 0;
+    if (sortedList.isEmpty()) {
+      encodeInfo.lastIterator = true;
+    }
 
     // create MultiTvListIterator. It need not handle float/double precision 
here.
     List<TVList> tvLists = new ArrayList<>(sortedList);
@@ -476,70 +475,21 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
         MemPointIteratorFactory.create(schema.getType(), tvLists);
 
     while (timeValuePairIterator.hasNextBatch()) {
-      TsBlock tsBlock = timeValuePairIterator.nextBatch();
-      if (tsBlock == null) {
-        continue;
-      }
-
-      for (int rowIndex = 0; rowIndex < tsBlock.getPositionCount(); 
rowIndex++) {
-        long time = tsBlock.getTimeByIndex(rowIndex);
-        // store last point for SDT
-        if (rowIndex + 1 == tsBlock.getPositionCount() && 
!timeValuePairIterator.hasNextBatch()) {
-          chunkWriterImpl.setLastPoint(true);
-        }
-
-        switch (tsDataType) {
-          case BOOLEAN:
-            chunkWriterImpl.write(time, 
tsBlock.getColumn(0).getBoolean(rowIndex));
-            dataSizeInCurrentChunk += 8L + 1L;
-            break;
-          case INT32:
-          case DATE:
-            chunkWriterImpl.write(time, tsBlock.getColumn(0).getInt(rowIndex));
-            dataSizeInCurrentChunk += 8L + 4L;
-            break;
-          case INT64:
-          case TIMESTAMP:
-            chunkWriterImpl.write(time, 
tsBlock.getColumn(0).getLong(rowIndex));
-            dataSizeInCurrentChunk += 8L + 8L;
-            break;
-          case FLOAT:
-            chunkWriterImpl.write(time, 
tsBlock.getColumn(0).getFloat(rowIndex));
-            dataSizeInCurrentChunk += 8L + 4L;
-            break;
-          case DOUBLE:
-            chunkWriterImpl.write(time, 
tsBlock.getColumn(0).getDouble(rowIndex));
-            dataSizeInCurrentChunk += 8L + 8L;
-            break;
-          case TEXT:
-          case BLOB:
-          case STRING:
-            Binary value = tsBlock.getColumn(0).getBinary(rowIndex);
-            chunkWriterImpl.write(time, value);
-            dataSizeInCurrentChunk += 8L + getBinarySize(value);
-            break;
-          default:
-            LOGGER.error("WritableMemChunk does not support data type: {}", 
tsDataType);
-            break;
-        }
-
-        pointNumInCurrentChunk++;
-        if (pointNumInCurrentChunk > MAX_NUMBER_OF_POINTS_IN_CHUNK
-            || dataSizeInCurrentChunk > TARGET_CHUNK_SIZE) {
-          chunkWriterImpl.sealCurrentPage();
-          chunkWriterImpl.clearPageWriter();
-          try {
-            ioTaskQueue.put(chunkWriterImpl);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-          }
-          chunkWriterImpl = createIChunkWriter();
-          dataSizeInCurrentChunk = 0;
-          pointNumInCurrentChunk = 0;
+      timeValuePairIterator.encodeBatch(chunkWriterImpl, encodeInfo, times);
+      if (encodeInfo.pointNumInChunk >= MAX_NUMBER_OF_POINTS_IN_CHUNK
+          || encodeInfo.dataSizeInChunk >= TARGET_CHUNK_SIZE) {
+        chunkWriterImpl.sealCurrentPage();
+        chunkWriterImpl.clearPageWriter();
+        try {
+          ioTaskQueue.put(chunkWriterImpl);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
         }
+        chunkWriterImpl = createIChunkWriter();
+        encodeInfo.resetPointAndSize();
       }
     }
-    if (pointNumInCurrentChunk != 0) {
+    if (encodeInfo.pointNumInChunk != 0) {
       chunkWriterImpl.sealCurrentPage();
       chunkWriterImpl.clearPageWriter();
       try {
@@ -547,6 +497,7 @@ public class WritableMemChunk extends 
AbstractWritableMemChunk {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
+      encodeInfo.reset();
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 548fa4d76b7..1cb913873fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.datastructure;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
@@ -44,6 +45,9 @@ import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.utils.TsPrimitiveType;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkWriter;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
 
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -1566,6 +1570,8 @@ public abstract class AlignedTVList extends TVList {
 
     private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
         
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+    private long maxNumberOfPointsInChunk =
+        IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
 
     public AlignedTVListIterator(
         List<TSDataType> dataTypeList,
@@ -1591,6 +1597,10 @@ public abstract class AlignedTVList extends TVList {
       for (int i = 0; i < dataTypeList.size(); i++) {
         valueColumnDeleteCursor.add(new int[] {0});
       }
+      int avgPointSizeOfLargestColumn = getAvgPointSizeOfLargestColumn();
+      long TARGET_CHUNK_SIZE = 
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
+      maxNumberOfPointsInChunk =
+          Math.min(maxNumberOfPointsInChunk, (TARGET_CHUNK_SIZE / 
avgPointSizeOfLargestColumn));
     }
 
     @Override
@@ -1756,7 +1766,7 @@ public abstract class AlignedTVList extends TVList {
       int startIndex = index;
       // time column
       for (; index < rows; index++) {
-        if (validRowCount > MAX_NUMBER_OF_POINTS_IN_PAGE) {
+        if (validRowCount >= MAX_NUMBER_OF_POINTS_IN_PAGE) {
           break;
         }
         // skip empty row
@@ -1773,10 +1783,10 @@ public abstract class AlignedTVList extends TVList {
                 || (isTimeDeleted(nextRowIndex)))) {
           nextRowIndex++;
         }
-        long timestamp = getTime(index);
-        if ((nextRowIndex == rows || timestamp != getTime(nextRowIndex))
-            && !isPointDeleted(timestamp, timeColumnDeletion, deleteCursor)) {
-          timeBuilder.writeLong(getTime(index));
+        long time = getTime(index);
+        if ((nextRowIndex == rows || time != getTime(nextRowIndex))
+            && !isPointDeleted(time, timeColumnDeletion, deleteCursor)) {
+          timeBuilder.writeLong(time);
           validRowCount++;
         } else {
           if (Objects.isNull(timeInvalidInfo)) {
@@ -1937,6 +1947,140 @@ public abstract class AlignedTVList extends TVList {
       return builder.build();
     }
 
+    @Override
+    public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo 
encodeInfo, long[] times) {
+      AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) 
chunkWriter;
+
+      // duplicated time or deleted time are all invalid, true if we don't 
need this row
+      BitMap timeDuplicateInfo = null;
+
+      int startIndex = index;
+      // time column
+      for (; index < rows; index++) {
+        if (encodeInfo.pointNumInChunk >= maxNumberOfPointsInChunk
+            || encodeInfo.pointNumInPage >= MAX_NUMBER_OF_POINTS_IN_PAGE) {
+          break;
+        }
+        // skip empty row
+        if (allValueColDeletedMap != null && 
allValueColDeletedMap.isMarked(getValueIndex(index))) {
+          continue;
+        }
+        if (isTimeDeleted(index)) {
+          continue;
+        }
+        int nextRowIndex = index + 1;
+        while (nextRowIndex < rows
+            && ((allValueColDeletedMap != null
+                    && 
allValueColDeletedMap.isMarked(getValueIndex(nextRowIndex)))
+                || (isTimeDeleted(nextRowIndex)))) {
+          nextRowIndex++;
+        }
+        long time = getTime(index);
+        if (nextRowIndex == rows || time != getTime(nextRowIndex)) {
+          times[encodeInfo.pointNumInPage++] = time;
+          encodeInfo.pointNumInChunk++;
+        } else {
+          if (Objects.isNull(timeDuplicateInfo)) {
+            timeDuplicateInfo = new BitMap(rows);
+          }
+          timeDuplicateInfo.mark(index);
+        }
+        index = nextRowIndex - 1;
+      }
+
+      int columnCount = dataTypeList.size();
+      // value columns
+      for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
+        ValueChunkWriter valueChunkWriter =
+            alignedChunkWriter.getValueChunkWriterByIndex(columnIndex);
+        int validColumnIndex = columnIndexList.get(columnIndex);
+
+        // Pair of Time and Index
+        Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null;
+        if (Objects.nonNull(timeDuplicateInfo)) {
+          lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE, 
null);
+        }
+        for (int sortedRowIndex = startIndex; sortedRowIndex < index; 
sortedRowIndex++) {
+          // skip empty row
+          if ((allValueColDeletedMap != null
+                  && 
allValueColDeletedMap.isMarked(getValueIndex(sortedRowIndex)))
+              || (isTimeDeleted(sortedRowIndex))) {
+            continue;
+          }
+          long time = getTime(sortedRowIndex);
+          // skip time duplicated or totally deleted rows
+          if (Objects.nonNull(timeDuplicateInfo)) {
+            if (!outer.isNullValue(getValueIndex(sortedRowIndex), 
validColumnIndex)) {
+              lastValidPointIndexForTimeDupCheck.left = 
getTime(sortedRowIndex);
+              lastValidPointIndexForTimeDupCheck.right = 
getValueIndex(sortedRowIndex);
+            }
+            if (timeDuplicateInfo.isMarked(sortedRowIndex)) {
+              continue;
+            }
+          }
+
+          // The part of code solves the following problem:
+          // Time: 1,2,2,3
+          // Value: 1,2,null,null
+          // When rowIndex:1, pair(min,null), timeDuplicateInfo:false, 
write(T:1,V:1)
+          // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing 
value
+          // When rowIndex:3, pair(2,2), timeDuplicateInfo:false, 
T:2==pair.left:2, write(T:2,V:2)
+          // When rowIndex:4, pair(2,2), timeDuplicateInfo:false, 
T:3!=pair.left:2,
+          // write(T:3,V:null)
+          int originRowIndex;
+          if (Objects.nonNull(lastValidPointIndexForTimeDupCheck)
+              && (getTime(sortedRowIndex) == 
lastValidPointIndexForTimeDupCheck.left)) {
+            originRowIndex = lastValidPointIndexForTimeDupCheck.right;
+          } else {
+            originRowIndex = getValueIndex(sortedRowIndex);
+          }
+
+          boolean isNull = outer.isNullValue(originRowIndex, validColumnIndex);
+          switch (dataTypeList.get(columnIndex)) {
+            case BOOLEAN:
+              valueChunkWriter.write(
+                  time,
+                  !isNull && getBooleanByValueIndex(originRowIndex, 
validColumnIndex),
+                  isNull);
+              break;
+            case INT32:
+            case DATE:
+              valueChunkWriter.write(
+                  time, isNull ? 0 : getIntByValueIndex(originRowIndex, 
validColumnIndex), isNull);
+              break;
+            case INT64:
+            case TIMESTAMP:
+              valueChunkWriter.write(
+                  time, isNull ? 0 : getLongByValueIndex(originRowIndex, 
validColumnIndex), isNull);
+              break;
+            case FLOAT:
+              valueChunkWriter.write(
+                  time,
+                  isNull ? 0 : getFloatByValueIndex(originRowIndex, 
validColumnIndex),
+                  isNull);
+              break;
+            case DOUBLE:
+              valueChunkWriter.write(
+                  time,
+                  isNull ? 0 : getDoubleByValueIndex(originRowIndex, 
validColumnIndex),
+                  isNull);
+              break;
+            case TEXT:
+            case BLOB:
+            case STRING:
+              valueChunkWriter.write(
+                  time,
+                  isNull ? null : getBinaryByValueIndex(originRowIndex, 
validColumnIndex),
+                  isNull);
+              break;
+            default:
+              break;
+          }
+        }
+      }
+      probeNext = false;
+    }
+
     public int[] getSelectedIndices() {
       return selectedIndices;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BatchEncodeInfo.java
similarity index 57%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BatchEncodeInfo.java
index b6864a27cb4..b836524438c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BatchEncodeInfo.java
@@ -19,13 +19,28 @@
 
 package org.apache.iotdb.db.utils.datastructure;
 
-import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.reader.IPointReader;
+// BatchEncodeInfo struct
+public class BatchEncodeInfo {
+  public int pointNumInPage;
+  public int pointNumInChunk;
+  public long dataSizeInChunk;
+  public boolean lastIterator;
 
-public interface MemPointIterator extends IPointReader {
-  TsBlock getBatch(int tsBlockIndex);
+  public BatchEncodeInfo(int pointNumInPage, int pointNumInChunk, long 
dataSizeInChunk) {
+    this.pointNumInPage = pointNumInPage;
+    this.pointNumInChunk = pointNumInChunk;
+    this.dataSizeInChunk = dataSizeInChunk;
+    this.lastIterator = false;
+  }
 
-  boolean hasNextBatch();
+  public void reset() {
+    resetPointAndSize();
+    this.lastIterator = false;
+  }
 
-  TsBlock nextBatch();
+  public void resetPointAndSize() {
+    this.pointNumInPage = 0;
+    this.pointNumInChunk = 0;
+    this.dataSizeInChunk = 0;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
index b6864a27cb4..66fc547db20 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.datastructure;
 
 import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.reader.IPointReader;
+import org.apache.tsfile.write.chunk.IChunkWriter;
 
 public interface MemPointIterator extends IPointReader {
   TsBlock getBatch(int tsBlockIndex);
@@ -28,4 +29,6 @@ public interface MemPointIterator extends IPointReader {
   boolean hasNextBatch();
 
   TsBlock nextBatch();
+
+  void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo encodeInfo, 
long[] times);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
index a4ae369b3a8..8ff8597d51d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
@@ -19,11 +19,18 @@
 
 package org.apache.iotdb.db.utils.datastructure;
 
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkWriter;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
 
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -47,6 +54,9 @@ public class MergeSortMultiAlignedTVListIterator extends 
MultiAlignedTVListItera
       new PriorityQueue<>(
           (a, b) -> a.left.equals(b.left) ? b.right.compareTo(a.right) : 
a.left.compareTo(b.left));
 
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+  private long maxNumberOfPointsInChunk = CONFIG.getTargetChunkPointNum();
+
   public MergeSortMultiAlignedTVListIterator(
       List<TSDataType> tsDataTypes,
       List<Integer> columnIndexList,
@@ -75,6 +85,17 @@ public class MergeSortMultiAlignedTVListIterator extends 
MultiAlignedTVListItera
       valueColumnDeleteCursor.add(new int[] {0});
     }
     this.ignoreAllNullRows = ignoreAllNullRows;
+
+    if (!alignedTvLists.isEmpty()) {
+      int avgPointSizeOfLargestColumn =
+          alignedTvLists.stream()
+              .mapToInt(AlignedTVList::getAvgPointSizeOfLargestColumn)
+              .max()
+              .getAsInt();
+      long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize();
+      maxNumberOfPointsInChunk =
+          Math.min(maxNumberOfPointsInChunk, (TARGET_CHUNK_SIZE / 
avgPointSizeOfLargestColumn));
+    }
   }
 
   @Override
@@ -164,6 +185,90 @@ public class MergeSortMultiAlignedTVListIterator extends 
MultiAlignedTVListItera
     probeNext = false;
   }
 
+  @Override
+  public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo 
encodeInfo, long[] times) {
+    AlignedChunkWriterImpl alignedChunkWriterImpl = (AlignedChunkWriterImpl) 
chunkWriter;
+    while (hasNextTimeValuePair()) {
+      times[encodeInfo.pointNumInPage] = currentTime;
+      for (int columnIndex = 0; columnIndex < tsDataTypeList.size(); 
columnIndex++) {
+        ValueChunkWriter valueChunkWriter =
+            alignedChunkWriterImpl.getValueChunkWriterByIndex(columnIndex);
+        AlignedTVList alignedTVList =
+            
alignedTvListIterators.get(currentIteratorIndex(columnIndex)).getAlignedTVList();
+
+        // sanity check
+        int validColumnIndex =
+            columnIndexList != null ? columnIndexList.get(columnIndex) : 
columnIndex;
+        if (validColumnIndex < 0 || validColumnIndex >= 
alignedTVList.dataTypes.size()) {
+          valueChunkWriter.write(currentTime, null, true);
+          continue;
+        }
+        int valueIndex = 
alignedTVList.getValueIndex(currentRowIndex(columnIndex));
+
+        // null value
+        if (alignedTVList.isNullValue(valueIndex, validColumnIndex)) {
+          valueChunkWriter.write(currentTime, null, true);
+          continue;
+        }
+
+        switch (tsDataTypeList.get(columnIndex)) {
+          case BOOLEAN:
+            valueChunkWriter.write(
+                currentTime,
+                alignedTVList.getBooleanByValueIndex(valueIndex, 
validColumnIndex),
+                false);
+            break;
+          case INT32:
+          case DATE:
+            valueChunkWriter.write(
+                currentTime, alignedTVList.getIntByValueIndex(valueIndex, 
validColumnIndex), false);
+            break;
+          case INT64:
+          case TIMESTAMP:
+            valueChunkWriter.write(
+                currentTime,
+                alignedTVList.getLongByValueIndex(valueIndex, 
validColumnIndex),
+                false);
+            break;
+          case FLOAT:
+            valueChunkWriter.write(
+                currentTime,
+                alignedTVList.getFloatByValueIndex(valueIndex, 
validColumnIndex),
+                false);
+            break;
+          case DOUBLE:
+            valueChunkWriter.write(
+                currentTime,
+                alignedTVList.getDoubleByValueIndex(valueIndex, 
validColumnIndex),
+                false);
+            break;
+          case TEXT:
+          case BLOB:
+          case STRING:
+            valueChunkWriter.write(
+                currentTime,
+                alignedTVList.getBinaryByValueIndex(valueIndex, 
validColumnIndex),
+                false);
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String.format("Data type %s is not supported.", 
tsDataTypeList.get(columnIndex)));
+        }
+      }
+      next();
+      encodeInfo.pointNumInPage++;
+      encodeInfo.pointNumInChunk++;
+
+      // new page
+      if (encodeInfo.pointNumInPage >= MAX_NUMBER_OF_POINTS_IN_PAGE
+          || encodeInfo.pointNumInChunk >= maxNumberOfPointsInChunk) {
+        alignedChunkWriterImpl.write(times, encodeInfo.pointNumInPage, 0);
+        encodeInfo.pointNumInPage = 0;
+        break;
+      }
+    }
+  }
+
   @Override
   protected int currentIteratorIndex(int columnIndex) {
     return iteratorIndices[columnIndex];
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
index 1436d519cb5..2eb59c416ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
@@ -19,17 +19,30 @@
 
 package org.apache.iotdb.db.utils.datastructure;
 
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkWriter;
 
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
+
 public class MergeSortMultiTVListIterator extends MultiTVListIterator {
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+  private final long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize();
+  private final long MAX_NUMBER_OF_POINTS_IN_CHUNK = 
CONFIG.getTargetChunkPointNum();
+
   private final List<Integer> probeIterators;
   private final PriorityQueue<Pair<Long, Integer>> minHeap =
       new PriorityQueue<>(
@@ -59,13 +72,15 @@ public class MergeSortMultiTVListIterator extends 
MultiTVListIterator {
 
     if (!minHeap.isEmpty()) {
       Pair<Long, Integer> top = minHeap.poll();
+      currentTime = top.left;
+      probeIterators.add(top.right);
+
       iteratorIndex = top.right;
-      probeIterators.add(iteratorIndex);
       rowIndex = tvListIterators.get(iteratorIndex).getIndex();
       hasNext = true;
 
       // duplicated timestamps
-      while (!minHeap.isEmpty() && minHeap.peek().left.longValue() == 
top.left.longValue()) {
+      while (!minHeap.isEmpty() && minHeap.peek().left == currentTime) {
         Pair<Long, Integer> element = minHeap.poll();
         probeIterators.add(element.right);
       }
@@ -80,4 +95,62 @@ public class MergeSortMultiTVListIterator extends 
MultiTVListIterator {
     }
     probeNext = false;
   }
+
+  @Override
+  public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo 
encodeInfo, long[] times) {
+    ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter;
+    while (hasNextTimeValuePair()) {
+      // remember current iterator and row index
+      TVList.TVListIterator currIterator = tvListIterators.get(iteratorIndex);
+      int row = rowIndex;
+      long time = currentTime;
+
+      // check if it is last point
+      next();
+      if (!hasNextTimeValuePair()) {
+        chunkWriterImpl.setLastPoint(true);
+      }
+
+      switch (tsDataType) {
+        case BOOLEAN:
+          chunkWriterImpl.write(time, 
currIterator.getTVList().getBoolean(row));
+          encodeInfo.dataSizeInChunk += 8L + 1L;
+          break;
+        case INT32:
+        case DATE:
+          chunkWriterImpl.write(time, currIterator.getTVList().getInt(row));
+          encodeInfo.dataSizeInChunk += 8L + 4L;
+          break;
+        case INT64:
+        case TIMESTAMP:
+          chunkWriterImpl.write(time, currIterator.getTVList().getLong(row));
+          encodeInfo.dataSizeInChunk += 8L + 8L;
+          break;
+        case FLOAT:
+          chunkWriterImpl.write(time, currIterator.getTVList().getFloat(row));
+          encodeInfo.dataSizeInChunk += 8L + 4L;
+          break;
+        case DOUBLE:
+          chunkWriterImpl.write(time, currIterator.getTVList().getDouble(row));
+          encodeInfo.dataSizeInChunk += 8L + 8L;
+          break;
+        case TEXT:
+        case BLOB:
+        case STRING:
+          Binary value = currIterator.getTVList().getBinary(row);
+          chunkWriterImpl.write(time, value);
+          encodeInfo.dataSizeInChunk += 8L + getBinarySize(value);
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("Data type %s is not supported.", tsDataType));
+      }
+      encodeInfo.pointNumInChunk++;
+
+      if (encodeInfo.pointNumInChunk >= MAX_NUMBER_OF_POINTS_IN_CHUNK
+          || encodeInfo.dataSizeInChunk >= TARGET_CHUNK_SIZE) {
+        break;
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
index cb203a5abb9..d400f629971 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
@@ -42,6 +42,7 @@ public abstract class MultiTVListIterator implements 
MemPointIterator {
 
   protected boolean probeNext = false;
   protected boolean hasNext = false;
+  protected long currentTime = 0;
   protected int iteratorIndex = 0;
   protected int rowIndex = 0;
 
@@ -79,9 +80,7 @@ public abstract class MultiTVListIterator implements 
MemPointIterator {
     }
     TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex);
     TimeValuePair currentTvPair =
-        iterator
-            .getTVList()
-            .getTimeValuePair(rowIndex, iterator.currentTime(), 
floatPrecision, encoding);
+        iterator.getTVList().getTimeValuePair(rowIndex, currentTime, 
floatPrecision, encoding);
     next();
     return currentTvPair;
   }
@@ -92,9 +91,7 @@ public abstract class MultiTVListIterator implements 
MemPointIterator {
       return null;
     }
     TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex);
-    return iterator
-        .getTVList()
-        .getTimeValuePair(rowIndex, iterator.currentTime(), floatPrecision, 
encoding);
+    return iterator.getTVList().getTimeValuePair(rowIndex, currentTime, 
floatPrecision, encoding);
   }
 
   @Override
@@ -107,7 +104,7 @@ public abstract class MultiTVListIterator implements 
MemPointIterator {
     TsBlockBuilder builder = new 
TsBlockBuilder(Collections.singletonList(tsDataType));
     while (hasNextTimeValuePair() && builder.getPositionCount() < 
MAX_NUMBER_OF_POINTS_IN_PAGE) {
       TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex);
-      builder.getTimeColumnBuilder().writeLong(iterator.currentTime());
+      builder.getTimeColumnBuilder().writeLong(currentTime);
       switch (tsDataType) {
         case BOOLEAN:
           
builder.getColumnBuilder(0).writeBoolean(iterator.getTVList().getBoolean(rowIndex));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
index 75a94b4fcb4..dc7c35f6de1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
@@ -23,6 +23,7 @@ import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.chunk.IChunkWriter;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -104,6 +105,20 @@ public class OrderedMultiAlignedTVListIterator extends 
MultiAlignedTVListIterato
     probeNext = false;
   }
 
+  @Override
+  public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo 
encodeInfo, long[] times) {
+    while (iteratorIndex < alignedTvListIterators.size()) {
+      TVList.TVListIterator iterator = 
alignedTvListIterators.get(iteratorIndex);
+      if (!iterator.hasNextBatch()) {
+        iteratorIndex++;
+        continue;
+      }
+      iterator.encodeBatch(chunkWriter, encodeInfo, times);
+      break;
+    }
+    probeNext = false;
+  }
+
   @Override
   protected int currentIteratorIndex(int columnIndex) {
     return iteratorIndex;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
index 48e58aceab1..e20ae061f75 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.utils.datastructure;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.write.chunk.IChunkWriter;
 
 import java.util.List;
 
@@ -38,13 +39,14 @@ public class OrderedMultiTVListIterator extends 
MultiTVListIterator {
   @Override
   protected void prepareNext() {
     hasNext = false;
-    while (iteratorIndex < tvListIterators.size() - 1
-        && !tvListIterators.get(iteratorIndex).hasNextTimeValuePair()) {
-      iteratorIndex++;
-    }
-    TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex);
-    if (iterator.hasNextTimeValuePair()) {
+    while (iteratorIndex < tvListIterators.size() && !hasNext) {
+      TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex);
+      if (!iterator.hasNextTimeValuePair()) {
+        iteratorIndex++;
+        continue;
+      }
       rowIndex = iterator.getIndex();
+      currentTime = iterator.currentTime();
       hasNext = true;
     }
     probeNext = true;
@@ -55,4 +57,21 @@ public class OrderedMultiTVListIterator extends 
MultiTVListIterator {
     tvListIterators.get(iteratorIndex).next();
     probeNext = false;
   }
+
+  @Override
+  public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo 
encodeInfo, long[] times) {
+    while (iteratorIndex < tvListIterators.size()) {
+      TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex);
+      if (!iterator.hasNextBatch()) {
+        iteratorIndex++;
+        continue;
+      }
+      if (iteratorIndex == tvListIterators.size() - 1) {
+        encodeInfo.lastIterator = true;
+      }
+      iterator.encodeBatch(chunkWriter, encodeInfo, times);
+      break;
+    }
+    probeNext = false;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index b4392e7866a..5b0a5382a76 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.datastructure;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
@@ -36,6 +37,9 @@ import org.apache.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkWriter;
 
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -49,6 +53,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
+import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
 import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted;
 import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
 import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
@@ -660,6 +665,10 @@ public abstract class TVList implements WALEntryValue {
 
     private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
         
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+    private final long TARGET_CHUNK_SIZE =
+        IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
+    private final long MAX_NUMBER_OF_POINTS_IN_CHUNK =
+        IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
 
     public TVListIterator(
         List<TimeRange> deletionList, Integer floatPrecision, TSEncoding 
encoding) {
@@ -733,10 +742,11 @@ public abstract class TVList implements WALEntryValue {
       switch (dataType) {
         case BOOLEAN:
           while (index < rows && builder.getPositionCount() < 
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+            long time = getTime(index);
             if (!isNullValue(getValueIndex(index))
-                && !isPointDeleted(getTime(index), deletionList, deleteCursor)
-                && (index == rows - 1 || getTime(index) != getTime(index + 
1))) {
-              builder.getTimeColumnBuilder().writeLong(getTime(index));
+                && !isPointDeleted(time, deletionList, deleteCursor)
+                && (index == rows - 1 || time != getTime(index + 1))) {
+              builder.getTimeColumnBuilder().writeLong(time);
               builder.getColumnBuilder(0).writeBoolean(getBoolean(index));
               builder.declarePosition();
             }
@@ -746,10 +756,11 @@ public abstract class TVList implements WALEntryValue {
         case INT32:
         case DATE:
           while (index < rows && builder.getPositionCount() < 
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+            long time = getTime(index);
             if (!isNullValue(getValueIndex(index))
-                && !isPointDeleted(getTime(index), deletionList, deleteCursor)
-                && (index == rows - 1 || getTime(index) != getTime(index + 
1))) {
-              builder.getTimeColumnBuilder().writeLong(getTime(index));
+                && !isPointDeleted(time, deletionList, deleteCursor)
+                && (index == rows - 1 || time != getTime(index + 1))) {
+              builder.getTimeColumnBuilder().writeLong(time);
               builder.getColumnBuilder(0).writeInt(getInt(index));
               builder.declarePosition();
             }
@@ -759,10 +770,11 @@ public abstract class TVList implements WALEntryValue {
         case INT64:
         case TIMESTAMP:
           while (index < rows && builder.getPositionCount() < 
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+            long time = getTime(index);
             if (!isNullValue(getValueIndex(index))
-                && !isPointDeleted(getTime(index), deletionList, deleteCursor)
-                && (index == rows - 1 || getTime(index) != getTime(index + 
1))) {
-              builder.getTimeColumnBuilder().writeLong(getTime(index));
+                && !isPointDeleted(time, deletionList, deleteCursor)
+                && (index == rows - 1 || time != getTime(index + 1))) {
+              builder.getTimeColumnBuilder().writeLong(time);
               builder.getColumnBuilder(0).writeLong(getLong(index));
               builder.declarePosition();
             }
@@ -771,10 +783,11 @@ public abstract class TVList implements WALEntryValue {
           break;
         case FLOAT:
           while (index < rows && builder.getPositionCount() < 
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+            long time = getTime(index);
             if (!isNullValue(getValueIndex(index))
-                && !isPointDeleted(getTime(index), deletionList, deleteCursor)
-                && (index == rows - 1 || getTime(index) != getTime(index + 
1))) {
-              builder.getTimeColumnBuilder().writeLong(getTime(index));
+                && !isPointDeleted(time, deletionList, deleteCursor)
+                && (index == rows - 1 || time != getTime(index + 1))) {
+              builder.getTimeColumnBuilder().writeLong(time);
               builder
                   .getColumnBuilder(0)
                   .writeFloat(
@@ -786,10 +799,11 @@ public abstract class TVList implements WALEntryValue {
           break;
         case DOUBLE:
           while (index < rows && builder.getPositionCount() < 
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+            long time = getTime(index);
             if (!isNullValue(getValueIndex(index))
-                && !isPointDeleted(getTime(index), deletionList, deleteCursor)
-                && (index == rows - 1 || getTime(index) != getTime(index + 
1))) {
-              builder.getTimeColumnBuilder().writeLong(getTime(index));
+                && !isPointDeleted(time, deletionList, deleteCursor)
+                && (index == rows - 1 || time != getTime(index + 1))) {
+              builder.getTimeColumnBuilder().writeLong(time);
               builder
                   .getColumnBuilder(0)
                   .writeDouble(
@@ -803,22 +817,91 @@ public abstract class TVList implements WALEntryValue {
         case BLOB:
         case STRING:
           while (index < rows && builder.getPositionCount() < 
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+            long time = getTime(index);
             if (!isNullValue(getValueIndex(index))
-                && !isPointDeleted(getTime(index), deletionList, deleteCursor)
-                && (index == rows - 1 || getTime(index) != getTime(index + 
1))) {
-              builder.getTimeColumnBuilder().writeLong(getTime(index));
+                && !isPointDeleted(time, deletionList, deleteCursor)
+                && (index == rows - 1 || time != getTime(index + 1))) {
+              builder.getTimeColumnBuilder().writeLong(time);
               builder.getColumnBuilder(0).writeBinary(getBinary(index));
               builder.declarePosition();
             }
             index++;
           }
           break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("Data type %s is not supported.", dataType));
       }
       TsBlock tsBlock = builder.build();
       tsBlocks.add(tsBlock);
       return tsBlock;
     }
 
+    @Override
+    public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo 
encodeInfo, long[] times) {
+      TSDataType dataType = getDataType();
+      ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter;
+      for (; index < rows; index++) {
+        if (isNullValue(getValueIndex(index))) {
+          continue;
+        }
+        long time = getTime(index);
+        while (index + 1 < rows && time == getTime(index + 1)) {
+          index++;
+        }
+        // store last point for SDT
+        if (encodeInfo.lastIterator) {
+          // skip deleted rows
+          while (index < rows && isNullValue(getValueIndex(index))) {
+            index++;
+          }
+          if (index == rows || index == rows - 1) {
+            chunkWriterImpl.setLastPoint(true);
+          }
+        }
+
+        switch (dataType) {
+          case BOOLEAN:
+            chunkWriterImpl.write(time, getBoolean(index));
+            encodeInfo.dataSizeInChunk += 8L + 1L;
+            break;
+          case INT32:
+          case DATE:
+            chunkWriterImpl.write(time, getInt(index));
+            encodeInfo.dataSizeInChunk += 8L + 4L;
+            break;
+          case INT64:
+          case TIMESTAMP:
+            chunkWriterImpl.write(time, getLong(index));
+            encodeInfo.dataSizeInChunk += 8L + 8L;
+            break;
+          case FLOAT:
+            chunkWriterImpl.write(time, getFloat(index));
+            encodeInfo.dataSizeInChunk += 8L + 4L;
+            break;
+          case DOUBLE:
+            chunkWriterImpl.write(time, getDouble(index));
+            encodeInfo.dataSizeInChunk += 8L + 8L;
+            break;
+          case TEXT:
+          case BLOB:
+          case STRING:
+            Binary value = getBinary(index);
+            chunkWriterImpl.write(time, value);
+            encodeInfo.dataSizeInChunk += 8L + getBinarySize(value);
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String.format("Data type %s is not supported.", dataType));
+        }
+        encodeInfo.pointNumInChunk++;
+        if (encodeInfo.pointNumInChunk >= MAX_NUMBER_OF_POINTS_IN_CHUNK
+            || encodeInfo.dataSizeInChunk >= TARGET_CHUNK_SIZE) {
+          break;
+        }
+      }
+    }
+
     @Override
     public long getUsedMemorySize() {
       return 0;

Reply via email to