This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/load_v2 by this push:
     new 593dd83d683 add statistics
593dd83d683 is described below

commit 593dd83d683565bdf7d7954c1e197b0fde2fac79
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Sep 27 09:08:17 2023 +0800

    add statistics
---
 .../load/DeviceBatchTsFileDataManager.java         |  16 +-
 .../execution/load/MergedTsFileSplitter.java       | 108 ++++--
 .../execution/load/TsFileDataManager.java          |   8 +-
 .../execution/load/TsFileSplitSender.java          | 369 ++++++++++++++-----
 .../load/locseq/FixedLocationSequencer.java        |   7 +-
 .../execution/load/locseq/LocationSequencer.java   |   6 +-
 .../execution/load/locseq/LocationStatistics.java  |  35 +-
 .../load/locseq/RandomLocationSequencer.java       |   6 +-
 .../locseq/ThroughputBasedLocationSequencer.java   |  66 ++--
 .../nodesplit/ClusteringMeasurementSplitter.java   | 401 ++++++++++++++++-----
 .../load/nodesplit/OrderedMeasurementSplitter.java |  11 +-
 .../load/nodesplit/PieceNodeSplitter.java          |   3 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |   1 -
 .../execution/load/LoadTsFileSchedulerTest.java    | 125 ++++---
 .../execution/load/MergedTsFileSplitterTest.java   |   7 +-
 .../db/queryengine/execution/load/TestBase.java    | 141 +++++---
 .../execution/load/TsFileSplitSenderTest.java      | 200 ++++++----
 .../execution/load/TsFileSplitterTest.java         |   7 +-
 .../org/apache/iotdb/db/utils/SequenceUtils.java   |  42 ++-
 .../src/main/thrift/datanode.thrift                |   2 +
 20 files changed, 1078 insertions(+), 483 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
index d79bf322011..b343c0b4a06 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
@@ -19,19 +19,21 @@
 
 package org.apache.iotdb.db.queryengine.execution.load;
 
-import java.io.File;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 
-/**
- * Like TsFileDataManager, but one batch (TsFilePieceNode) belongs to the same 
device.
- */
-public class DeviceBatchTsFileDataManager extends TsFileDataManager{
+import java.io.File;
+
+/** Like TsFileDataManager, but one batch (TsFilePieceNode) belongs to the 
same device. */
+public class DeviceBatchTsFileDataManager extends TsFileDataManager {
 
   private String currentDeviceId;
 
-  public DeviceBatchTsFileDataManager(DispatchFunction dispatchFunction,
+  public DeviceBatchTsFileDataManager(
+      DispatchFunction dispatchFunction,
       PlanNodeId planNodeId,
-      File targetFile, DataPartitionBatchFetcher partitionBatchFetcher, long 
maxMemorySize) {
+      File targetFile,
+      DataPartitionBatchFetcher partitionBatchFetcher,
+      long maxMemorySize) {
     super(dispatchFunction, planNodeId, targetFile, partitionBatchFetcher, 
maxMemorySize);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
index 231b273c1d1..2f9464c0830 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
@@ -79,65 +79,99 @@ public class MergedTsFileSplitter {
   private final List<File> tsFiles;
   private final Function<TsFileData, Boolean> consumer;
   private final PriorityQueue<SplitTask> taskPriorityQueue;
+  private final int maxConcurrentFileNum;
   private ExecutorService asyncExecutor;
   private long timePartitionInterval;
   private AtomicInteger splitIdGenerator = new AtomicInteger();
+  private Statistic statistic = new Statistic();
 
   public MergedTsFileSplitter(
       List<File> tsFiles,
       Function<TsFileData, Boolean> consumer,
       ExecutorService asyncExecutor,
-      long timePartitionInterval) {
+      long timePartitionInterval,
+      int maxConcurrentFileNum) {
     this.tsFiles = tsFiles;
     this.consumer = consumer;
     this.asyncExecutor = asyncExecutor;
     this.timePartitionInterval = timePartitionInterval;
+    this.maxConcurrentFileNum = maxConcurrentFileNum;
     taskPriorityQueue = new PriorityQueue<>();
   }
 
   public void splitTsFileByDataPartition() throws IOException, 
IllegalStateException {
-    for (File tsFile : tsFiles) {
-      SplitTask splitTask = new SplitTask(tsFile, asyncExecutor);
+    long startTime = System.nanoTime();
+    int i = 0;
+    for (; i < tsFiles.size(); i++) {
+      // only allow at most maxConcurrentFileNum files to be merged at the 
same time
+      if (taskPriorityQueue.size() > maxConcurrentFileNum) {
+        break;
+      }
+
+      File tsFile = tsFiles.get(i);
+      SplitTask splitTask = new SplitTask(tsFile, asyncExecutor, i);
+      logger.info("Start to split {}", tsFiles.get(i));
       if (splitTask.hasNext()) {
         taskPriorityQueue.add(splitTask);
       }
     }
+    statistic.initTime = System.nanoTime() - startTime;
 
-    List<SplitTask> equalTasks = new ArrayList<>();
     while (!taskPriorityQueue.isEmpty()) {
+      startTime = System.nanoTime();
       SplitTask task = taskPriorityQueue.poll();
-      equalTasks.add(task);
-      // find chunks of the same series in other files
-      while (!taskPriorityQueue.isEmpty()) {
-        if (taskPriorityQueue.peek().compareTo(task) == 0) {
-          equalTasks.add(taskPriorityQueue.poll());
-        } else {
-          break;
-        }
-      }
+      TsFileData tsFileData = task.removeNext();
+      statistic.fetchDataTime += System.nanoTime() - startTime;
+      tsFileData.setSplitId(splitIdGenerator.incrementAndGet());
 
-      for (SplitTask equalTask : equalTasks) {
-        TsFileData tsFileData = equalTask.removeNext();
-        tsFileData.setSplitId(splitIdGenerator.incrementAndGet());
-        consumer.apply(tsFileData);
-        if (equalTask.hasNext()) {
-          taskPriorityQueue.add(equalTask);
+      startTime = System.nanoTime();
+      consumer.apply(tsFileData);
+      statistic.consumeTime += System.nanoTime() - startTime;
+
+
+      startTime = System.nanoTime();
+      if (task.hasNext()) {
+        taskPriorityQueue.add(task);
+      } else {
+        // when a file is exhausted, add the next non-empty file
+        for (; i < tsFiles.size(); i++) {
+          SplitTask splitTask = new SplitTask(tsFiles.get(i), asyncExecutor, 
i);
+          logger.info("Start to split {}", tsFiles.get(i));
+          if (splitTask.hasNext()) {
+            taskPriorityQueue.add(splitTask);
+            i++;
+            break;
+          }
         }
       }
-      equalTasks.clear();
+      statistic.enqueueTime += System.nanoTime() - startTime;
     }
   }
 
   public void close() throws IOException {
+    logger.info("Init/FetchData/Consume/Enqueue Time: {}/{}/{}/{}ms"
+        , statistic.initTime / 1_000_000L
+        , statistic.fetchDataTime / 1_000_000L
+        , statistic.consumeTime / 1_000_000L
+        , statistic.enqueueTime / 1_000_000L);
     for (SplitTask task : taskPriorityQueue) {
       task.close();
     }
     taskPriorityQueue.clear();
+    logger.info("Splitter closed.");
+  }
+
+  private class Statistic {
+    private long initTime;
+    private long fetchDataTime;
+    private long consumeTime;
+    private long enqueueTime;
   }
 
   private class SplitTask implements Comparable<SplitTask> {
 
     private final File tsFile;
+    private final int fileId;
     private TsFileSequenceReader reader;
     private final TreeMap<Long, List<Deletion>> offset2Deletions;
 
@@ -154,8 +188,9 @@ public class MergedTsFileSplitter {
     private ExecutorService asyncExecutor;
     private Future<Void> asyncTask;
 
-    public SplitTask(File tsFile, ExecutorService asyncExecutor) throws 
IOException {
+    public SplitTask(File tsFile, ExecutorService asyncExecutor, int fileId) 
throws IOException {
       this.tsFile = tsFile;
+      this.fileId = fileId;
       this.asyncExecutor = asyncExecutor;
       offset2Deletions = new TreeMap<>();
       init();
@@ -182,7 +217,11 @@ public class MergedTsFileSplitter {
         Comparator<ChunkData> chunkDataComparator =
             Comparator.comparing(ChunkData::getDevice, String::compareTo)
                 .thenComparing(ChunkData::firstMeasurement, String::compareTo);
-        return chunkDataComparator.compare(thisChunk, thatChunk);
+        int chunkCompare = chunkDataComparator.compare(thisChunk, thatChunk);
+        if (chunkCompare != 0) {
+          return chunkCompare;
+        }
+        return Integer.compare(this.fileId, o.fileId);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -225,7 +264,7 @@ public class MergedTsFileSplitter {
       while (reader != null && !Thread.interrupted()) {
         computeNext();
       }
-      logger.info("{} end splitting", tsFile);
+      logger.info("{} ends splitting", tsFile);
     }
 
     public boolean hasNext() throws IOException {
@@ -329,7 +368,6 @@ public class MergedTsFileSplitter {
             }
 
             ChunkHeader header = reader.readChunkHeader(marker);
-            String measurementId = header.getMeasurementID();
             if (header.getDataSize() == 0) {
               throw new TsFileRuntimeException(
                   String.format(
@@ -539,7 +577,8 @@ public class MergedTsFileSplitter {
       }
 
       @Override
-      public void writeToFileWriter(TsFileIOWriter writer) throws IOException 
{}
+      public void writeToFileWriter(TsFileIOWriter writer) throws IOException {
+      }
 
       @Override
       public boolean isModification() {
@@ -547,7 +586,8 @@ public class MergedTsFileSplitter {
       }
 
       @Override
-      public void serialize(DataOutputStream stream) throws IOException {}
+      public void serialize(DataOutputStream stream) throws IOException {
+      }
 
       @Override
       public int getSplitId() {
@@ -555,7 +595,8 @@ public class MergedTsFileSplitter {
       }
 
       @Override
-      public void setSplitId(int sid) {}
+      public void setSplitId(int sid) {
+      }
     }
 
     private void getAllModification(Map<Long, List<Deletion>> 
offset2Deletions) throws IOException {
@@ -703,19 +744,6 @@ public class MergedTsFileSplitter {
       }
     }
 
-    /**
-     * handle empty page in aligned chunk, if uncompressedSize and 
compressedSize are both 0, and
-     * the statistics is null, then the page is empty.
-     *
-     * @param pageHeader page header
-     * @return true if the page is empty
-     */
-    private boolean isEmptyPage(PageHeader pageHeader) {
-      return pageHeader.getUncompressedSize() == 0
-          && pageHeader.getCompressedSize() == 0
-          && pageHeader.getStatistics() == null;
-    }
-
     private TsPrimitiveType[] decodeValuePage(
         TsFileSequenceReader reader,
         ChunkHeader chunkHeader,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
index b59b84a0d4b..bf1c79172b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.queryengine.execution.load;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
-import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
@@ -101,8 +100,7 @@ public class TsFileDataManager {
     // start to dispatch from the biggest TsFilePieceNode
     List<TRegionReplicaSet> sortedReplicaSets =
         replicaSet2Piece.keySet().stream()
-            .sorted(
-                Comparator.comparingLong(o -> 
replicaSet2Piece.get(o).getDataSize()).reversed())
+            .sorted(Comparator.comparingLong(o -> 
replicaSet2Piece.get(o).getDataSize()).reversed())
             .collect(Collectors.toList());
 
     for (TRegionReplicaSet sortedReplicaSet : sortedReplicaSets) {
@@ -126,7 +124,6 @@ public class TsFileDataManager {
     return true;
   }
 
-
   protected void routeChunkData() {
     if (nonDirectionalChunkData.isEmpty()) {
       return;
@@ -161,7 +158,8 @@ public class TsFileDataManager {
     routeChunkData();
 
     for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : 
replicaSet2Piece.entrySet()) {
-      if (entry.getValue().getDataSize() > 0 && 
!dispatchFunction.dispatchOnePieceNode(entry.getValue(), entry.getKey())) {
+      if (entry.getValue().getDataSize() > 0
+          && !dispatchFunction.dispatchOnePieceNode(entry.getValue(), 
entry.getKey())) {
         logger.warn("Dispatch piece node {} of TsFile {} error.", 
entry.getValue(), targetFile);
         return false;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
index 6ef1b578325..2f09a003106 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
@@ -19,20 +19,10 @@
 
 package org.apache.iotdb.db.queryengine.execution.load;
 
-import static 
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileDispatcherImpl.NODE_CONNECTION_ERROR;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -49,7 +39,6 @@ import 
org.apache.iotdb.db.queryengine.execution.load.locseq.ThroughputBasedLoca
 import 
org.apache.iotdb.db.queryengine.execution.load.nodesplit.ClusteringMeasurementSplitter;
 import 
org.apache.iotdb.db.queryengine.execution.load.nodesplit.OrderedMeasurementSplitter;
 import 
org.apache.iotdb.db.queryengine.execution.load.nodesplit.PieceNodeSplitter;
-import 
org.apache.iotdb.db.queryengine.execution.load.nodesplit.PieceNodeSplitter.SingletonSplitter;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 import 
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
@@ -58,15 +47,36 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.utils.Pair;
+
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileDispatcherImpl.NODE_CONNECTION_ERROR;
+
 public class TsFileSplitSender {
 
   private static final int MAX_RETRY = 5;
   private static final long RETRY_INTERVAL_MS = 6_000L;
+  private static final int MAX_PENDING_PIECE_NODE = 5;
   private static final Logger logger = 
LoggerFactory.getLogger(TsFileSplitSender.class);
 
   private LoadTsFileNode loadTsFileNode;
@@ -83,8 +93,13 @@ public class TsFileSplitSender {
       new ConcurrentHashMap<>();
   private Map<TRegionReplicaSet, Exception> phaseTwoFailures = new HashMap<>();
   private long maxSplitSize;
-  private PieceNodeSplitter pieceNodeSplitter = new 
ClusteringMeasurementSplitter(10, 10);
-//  private PieceNodeSplitter pieceNodeSplitter = new 
OrderedMeasurementSplitter();
+  private PieceNodeSplitter pieceNodeSplitter = new 
ClusteringMeasurementSplitter(1.0, 10);
+//        private PieceNodeSplitter pieceNodeSplitter = new 
OrderedMeasurementSplitter();
+  private CompressionType compressionType = CompressionType.LZ4;
+  private Statistic statistic = new Statistic();
+  private ExecutorService splitNodeService;
+  private Queue<Pair<Future<List<LoadTsFilePieceNode>>, TRegionReplicaSet>> 
splitFutures;
+  private int maxConcurrentFileNum;
 
   public TsFileSplitSender(
       LoadTsFileNode loadTsFileNode,
@@ -92,16 +107,21 @@ public class TsFileSplitSender {
       long targetPartitionInterval,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager,
       boolean isGeneratedByPipe,
-      long maxSplitSize) {
+      long maxSplitSize,
+      int maxConcurrentFileNum) {
     this.loadTsFileNode = loadTsFileNode;
     this.targetPartitionFetcher = targetPartitionFetcher;
     this.targetPartitionInterval = targetPartitionInterval;
     this.internalServiceClientManager = internalServiceClientManager;
     this.isGeneratedByPipe = isGeneratedByPipe;
     this.maxSplitSize = maxSplitSize;
+    this.splitNodeService = 
IoTDBThreadPoolFactory.newCachedThreadPool("SplitLoadTsFilePieceNode");
+    this.splitFutures = new ArrayDeque<>(MAX_PENDING_PIECE_NODE);
+    this.maxConcurrentFileNum = maxConcurrentFileNum;
   }
 
   public void start() throws IOException {
+    statistic.taskStartTime = System.currentTimeMillis();
     // skip files without data
     loadTsFileNode.getResources().removeIf(f -> f.getDevices().isEmpty());
     uuid = UUID.randomUUID().toString();
@@ -113,10 +133,13 @@ public class TsFileSplitSender {
     } else {
       logger.warn("Can not Load TsFiles {}", loadTsFileNode.getResources());
     }
+    statistic.taskEndTime = System.currentTimeMillis();
     locationStatistics.logLocationStatistics();
+    statistic.logStatistic();
   }
 
   private boolean firstPhase(LoadTsFileNode node) throws IOException {
+    long start = System.currentTimeMillis();
     TsFileDataManager tsFileDataManager =
         new DeviceBatchTsFileDataManager(
             this::dispatchOnePieceNode,
@@ -134,16 +157,22 @@ public class TsFileSplitSender {
             new SynchronousQueue<>(),
             new IoTThreadFactory("MergedTsFileSplitter"),
             "MergedTsFileSplitter");
-    MergedTsFileSplitter splitter = new MergedTsFileSplitter(
-        node.getResources().stream()
-            .map(TsFileResource::getTsFile)
-            .collect(Collectors.toList()),
-        tsFileDataManager::addOrSendTsFileData,
-        executorService,
-        targetPartitionInterval);
+    MergedTsFileSplitter splitter =
+        new MergedTsFileSplitter(
+            node.getResources().stream()
+                .map(TsFileResource::getTsFile)
+                .collect(Collectors.toList()),
+            tsFileDataManager::addOrSendTsFileData,
+            executorService,
+            targetPartitionInterval,
+            maxConcurrentFileNum);
     splitter.splitTsFileByDataPartition();
     splitter.close();
-    return tsFileDataManager.sendAllTsFileData() && phaseOneFailures.isEmpty();
+    logger.info("Split ends after {}ms", System.currentTimeMillis() - start);
+    boolean success = tsFileDataManager.sendAllTsFileData() && 
processRemainingPieceNodes()
+        && phaseOneFailures.isEmpty();
+    logger.info("Cleanup ends after {}ms", System.currentTimeMillis() - start);
+    return success;
   }
 
   private boolean secondPhase(boolean isFirstPhaseSuccess) {
@@ -199,81 +228,237 @@ public class TsFileSplitSender {
   }
 
   public LocationSequencer createLocationSequencer(TRegionReplicaSet 
replicaSet) {
-//    return new FixedLocationSequencer(replicaSet);
-//    return new RandomLocationSequencer(replicaSet);
+    //    return new FixedLocationSequencer(replicaSet);
+    //    return new RandomLocationSequencer(replicaSet);
     return new ThroughputBasedLocationSequencer(replicaSet, 
locationStatistics);
   }
 
-  public boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode, 
TRegionReplicaSet replicaSet) {
-    allReplicaSets.add(replicaSet);
-
-    long start = System.currentTimeMillis();
-    List<LoadTsFilePieceNode> subNodes = pieceNodeSplitter.split(pieceNode);
-    logger.info("{} splits are generated after {}ms", subNodes.size(), 
System.currentTimeMillis() - start);
-
-    List<Boolean> subNodeResults = subNodes.stream().parallel().map(node -> {
-      long startTime = 0;
-      TTsFilePieceReq loadTsFileReq =
-          new TTsFilePieceReq(node.serializeToByteBuffer(), uuid, 
replicaSet.getRegionId());
-      loadTsFileReq.isRelay = true;
-      LocationSequencer locationSequencer = 
createLocationSequencer(replicaSet);
-
-      boolean loadSucceed = false;
-      Exception lastConnectionError = null;
-      TDataNodeLocation currLocation = null;
-      for (TDataNodeLocation location : locationSequencer) {
-        if (location.getDataNodeId() == 0 && logger.isDebugEnabled()) {
-          locationStatistics.logLocationStatistics();
-          logger.info("Chose location {}", location.getDataNodeId());
-        }
-        currLocation = location;
-        startTime = System.nanoTime();
-        for (int i = 0; i < MAX_RETRY; i++) {
-          try (SyncDataNodeInternalServiceClient client =
-              
internalServiceClientManager.borrowClient(currLocation.internalEndPoint)) {
-            TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
-            logger.debug("Response from {}: {}", location.getDataNodeId(), 
loadResp);
-            if (!loadResp.isAccepted()) {
-              logger.warn(loadResp.message);
-              phaseOneFailures.put(
-                  new Pair<>(node, replicaSet),
-                  new FragmentInstanceDispatchException(loadResp.status));
-              return false;
-            }
-            loadSucceed = true;
-            break;
-          } catch (ClientManagerException | TException e) {
-            lastConnectionError = e;
-          }
+  private ByteBuffer compressBuffer(ByteBuffer buffer) throws IOException {
+    statistic.rawSize.addAndGet(buffer.remaining());
+    if (compressionType.equals(CompressionType.UNCOMPRESSED)) {
+      statistic.compressedSize.addAndGet(buffer.remaining());
+      return buffer;
+    }
+    ICompressor compressor = ICompressor.getCompressor(compressionType);
+    int maxBytesForCompression = 
compressor.getMaxBytesForCompression(buffer.remaining()) + 1;
+    ByteBuffer compressed = ByteBuffer.allocate(maxBytesForCompression);
+    int compressLength = compressor.compress(buffer.array(),
+        buffer.arrayOffset() + buffer.position(), buffer.remaining(), 
compressed.array());
+    compressed.limit(compressLength);
+    statistic.compressedSize.addAndGet(compressLength);
+    return compressed;
+  }
 
-          try {
-            Thread.sleep(RETRY_INTERVAL_MS);
-          } catch (InterruptedException e) {
-            return false;
-          }
-        }
+  private Future<List<LoadTsFilePieceNode>> 
submitSplitPieceNode(LoadTsFilePieceNode pieceNode) {
+    return splitNodeService.submit(() -> pieceNodeSplitter.split(pieceNode));
+  }
 
-        if (loadSucceed) {
-          break;
-        }
+  private boolean processRemainingPieceNodes() {
+    List<LoadTsFilePieceNode> subNodes;
+    for (Pair<Future<List<LoadTsFilePieceNode>>, TRegionReplicaSet> pair : 
splitFutures) {
+      try {
+        subNodes = pair.left.get();
+      } catch (InterruptedException | ExecutionException e) {
+        logger.error("Unexpected error during splitting node", e);
+        return false;
       }
-
-      if (!loadSucceed) {
-        String warning = NODE_CONNECTION_ERROR;
-        logger.warn(warning, currLocation, lastConnectionError);
-        TSStatus status = new TSStatus();
-        status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
-        status.setMessage(warning + currLocation);
-        phaseOneFailures.put(
-            new Pair<>(node, replicaSet), new 
FragmentInstanceDispatchException(status));
+      if (!dispatchPieceNodes(subNodes, pair.right)) {
         return false;
       }
-      long timeConsumption = System.nanoTime() - startTime;
-      logger.debug("Time consumption: {}", timeConsumption);
-      locationStatistics.updateThroughput(currLocation, node.getDataSize(), 
timeConsumption);
-      return true;
-    }).collect(Collectors.toList());
+    }
+    return true;
+  }
+
+  private boolean dispatchPieceNodes(List<LoadTsFilePieceNode> subNodes,
+      TRegionReplicaSet replicaSet) {
+    AtomicLong minDispatchTime = new AtomicLong(Long.MAX_VALUE);
+    AtomicLong maxDispatchTime = new AtomicLong(Long.MIN_VALUE);
+    AtomicLong sumDispatchTime = new AtomicLong();
+    AtomicLong sumCompressingTime = new AtomicLong();
+
+    long start = System.nanoTime();
+    List<Boolean> subNodeResults =
+        subNodes.stream()
+            .parallel()
+            .map(
+                node -> {
+                  ByteBuffer buffer;
+                  long startTime = System.nanoTime();
+                  int uncompressedLength;
+                  try {
+                    buffer = node.serializeToByteBuffer();
+                    uncompressedLength = buffer.remaining();
+                    buffer = compressBuffer(buffer);
+                  } catch (IOException e) {
+                    phaseOneFailures.put(new Pair<>(node, replicaSet), e);
+                    return false;
+                  }
+                  long compressingTime = System.nanoTime() - startTime;
+                  sumCompressingTime.addAndGet(compressingTime);
+                  statistic.compressingTime.addAndGet(compressingTime);
+
+                  TTsFilePieceReq loadTsFileReq =
+                      new TTsFilePieceReq(buffer, uuid, 
replicaSet.getRegionId());
+                  loadTsFileReq.isRelay = true;
+                  
loadTsFileReq.setCompressionType(compressionType.serialize());
+                  loadTsFileReq.setUncompressedLength(uncompressedLength);
+                  LocationSequencer locationSequencer = 
createLocationSequencer(replicaSet);
+
+                  boolean loadSucceed = false;
+                  Exception lastConnectionError = null;
+                  TDataNodeLocation currLocation = null;
+                  for (TDataNodeLocation location : locationSequencer) {
+                    if (location.getDataNodeId() == 0 && 
logger.isDebugEnabled()) {
+                      locationStatistics.logLocationStatistics();
+                      logger.info("Chose location {}", 
location.getDataNodeId());
+                    }
+                    currLocation = location;
+                    startTime = System.nanoTime();
+                    for (int i = 0; i < MAX_RETRY; i++) {
+                      try (SyncDataNodeInternalServiceClient client =
+                          internalServiceClientManager.borrowClient(
+                              currLocation.internalEndPoint)) {
+                        TLoadResp loadResp = 
client.sendTsFilePieceNode(loadTsFileReq);
+                        logger.debug("Response from {}: {}", 
location.getDataNodeId(), loadResp);
+                        if (!loadResp.isAccepted()) {
+                          logger.warn(loadResp.message);
+                          phaseOneFailures.put(
+                              new Pair<>(node, replicaSet),
+                              new 
FragmentInstanceDispatchException(loadResp.status));
+                          return false;
+                        }
+                        loadSucceed = true;
+                        break;
+                      } catch (ClientManagerException | TException e) {
+                        lastConnectionError = e;
+                      }
+
+                      try {
+                        Thread.sleep(RETRY_INTERVAL_MS);
+                      } catch (InterruptedException e) {
+                        return false;
+                      }
+                    }
 
+                    if (loadSucceed) {
+                      break;
+                    }
+                  }
+
+                  if (!loadSucceed) {
+                    String warning = NODE_CONNECTION_ERROR;
+                    logger.warn(warning, currLocation, lastConnectionError);
+                    TSStatus status = new TSStatus();
+                    
status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+                    status.setMessage(warning + currLocation);
+                    phaseOneFailures.put(
+                        new Pair<>(node, replicaSet),
+                        new FragmentInstanceDispatchException(status));
+                    return false;
+                  }
+                  long timeConsumption = System.nanoTime() - startTime;
+                  logger.debug("Time consumption: {}", timeConsumption);
+                  locationStatistics.updateThroughput(
+                      currLocation, node.getDataSize(), timeConsumption);
+
+                  synchronized (maxDispatchTime) {
+                    if (maxDispatchTime.get() < timeConsumption) {
+                      maxDispatchTime.set(timeConsumption);
+                    }
+                  }
+                  synchronized (minDispatchTime) {
+                    if (minDispatchTime.get() > timeConsumption) {
+                      minDispatchTime.set(timeConsumption);
+                    }
+                  }
+                  sumDispatchTime.addAndGet(timeConsumption);
+                  return true;
+                })
+            .collect(Collectors.toList());
+    long elapsedTime = System.nanoTime() - start;
+    statistic.dispatchNodesTime.addAndGet(elapsedTime);
+    logger.debug(
+        "Dispatched one node after {}ms, min/maxDispatching time: {}/{}ns, 
avg: {}ns, sum: {}ns, compressing: {}ns",
+        elapsedTime / 1_000_000L, minDispatchTime.get(),
+        maxDispatchTime.get(), sumDispatchTime.get() / subNodes.size(), 
sumDispatchTime.get(),
+        sumCompressingTime.get());
     return !subNodeResults.contains(false);
   }
+
+  public boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode, 
TRegionReplicaSet replicaSet) {
+    long allStart = System.nanoTime();
+    allReplicaSets.add(replicaSet);
+    if (false) {
+      long start = System.nanoTime();
+      List<LoadTsFilePieceNode> split = pieceNodeSplitter.split(pieceNode);
+      long elapsedTime = System.nanoTime() - start;
+      statistic.splitTime.addAndGet(elapsedTime);
+      statistic.pieceNodeNum.incrementAndGet();
+      logger.debug(
+          "{} splits are generated after {}ms", split.size(),
+          elapsedTime / 1_000_000L);
+      boolean success = dispatchPieceNodes(split, replicaSet);
+      statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
+      return success;
+    }
+
+    List<LoadTsFilePieceNode> subNodes;
+    if (splitFutures.size() < MAX_PENDING_PIECE_NODE) {
+      splitFutures.add(new Pair<>(submitSplitPieceNode(pieceNode), 
replicaSet));
+      statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
+      return true;
+    } else {
+      long start = System.nanoTime();
+      Pair<Future<List<LoadTsFilePieceNode>>, TRegionReplicaSet> pair = 
splitFutures.poll();
+      try {
+        subNodes = pair.left.get();
+        long elapsedTime = System.nanoTime() - start;
+        statistic.splitTime.addAndGet(elapsedTime);
+        statistic.pieceNodeNum.incrementAndGet();
+        logger.debug(
+            "{} splits are generated after {}ms", subNodes.size(),
+            elapsedTime / 1_000_000L);
+
+        splitFutures.add(new Pair<>(submitSplitPieceNode(pieceNode), 
replicaSet));
+      } catch (InterruptedException | ExecutionException e) {
+        logger.error("Unexpected error during splitting node", e);
+        return false;
+      }
+      boolean success = dispatchPieceNodes(subNodes, pair.right);
+      statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
+      return success;
+    }
+  }
+
+  public static class Statistic {
+
+    public long taskStartTime;
+    public long taskEndTime;
+    public AtomicLong rawSize = new AtomicLong();
+    public AtomicLong compressedSize = new AtomicLong();
+    public AtomicLong splitTime = new AtomicLong();
+    public AtomicLong pieceNodeNum = new AtomicLong();
+    public AtomicLong dispatchNodesTime = new AtomicLong();
+    public AtomicLong dispatchNodeTime = new AtomicLong();
+    public AtomicLong compressingTime = new AtomicLong();
+
+    public void logStatistic() {
+      logger.info("Time consumption: {}ms", taskEndTime - taskStartTime);
+      logger.info(
+          "Generated {} piece nodes, splitTime: {}, dispatchSplitsTime: {}, 
dispatchNodeTime: {}",
+          pieceNodeNum.get(),
+          splitTime.get() / 1_000_000L, dispatchNodesTime.get() / 1_000_000L,
+          dispatchNodeTime.get() / 1_000_000L);
+      logger.info(
+          "Transmission size: {}/{} ({}), compressionTime: {}ms",
+          compressedSize.get(),
+          rawSize.get(),
+          compressedSize.get() * 1.0 / rawSize.get(),
+          compressingTime.get() / 1_000_000L);
+    }
+  }
+
+  public Statistic getStatistic() {
+    return statistic;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/FixedLocationSequencer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/FixedLocationSequencer.java
index 6bd49396f0e..b53bc75753f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/FixedLocationSequencer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/FixedLocationSequencer.java
@@ -19,13 +19,14 @@
 
 package org.apache.iotdb.db.queryengine.execution.load.locseq;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 
-public class FixedLocationSequencer implements LocationSequencer{
+public class FixedLocationSequencer implements LocationSequencer {
 
   private List<TDataNodeLocation> orderedLocations;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationSequencer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationSequencer.java
index cb3d8723baf..ff1442a4678 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationSequencer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationSequencer.java
@@ -19,10 +19,6 @@
 
 package org.apache.iotdb.db.queryengine.execution.load.locseq;
 
-import java.util.Iterator;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 
-public interface LocationSequencer extends Iterable<TDataNodeLocation> {
-
-
-}
+public interface LocationSequencer extends Iterable<TDataNodeLocation> {}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationStatistics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationStatistics.java
index 88360b7a305..f4417174263 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationStatistics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationStatistics.java
@@ -19,27 +19,27 @@
 
 package org.apache.iotdb.db.queryengine.execution.load.locseq;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.tsfile.utils.Pair;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
 public class LocationStatistics {
 
   private static final Logger logger = 
LoggerFactory.getLogger(LocationStatistics.class);
   private Map<TDataNodeLocation, Statistic> locationStatisticMap = new 
ConcurrentHashMap<>();
 
   public Statistic getStatistic(TDataNodeLocation location) {
-    return locationStatisticMap.computeIfAbsent(location,
-        l -> new Statistic());
+    return locationStatisticMap.computeIfAbsent(location, l -> new 
Statistic());
   }
 
   public void updateThroughput(TDataNodeLocation location, double sum, long 
cnt) {
-    Statistic statistic = locationStatisticMap.computeIfAbsent(location,
-        l -> new Statistic());
+    Statistic statistic = locationStatisticMap.computeIfAbsent(location, l -> 
new Statistic());
     statistic.updateThroughput(sum, cnt);
     statistic.increaseHit();
   }
@@ -70,11 +70,14 @@ public class LocationStatistics {
 
     @Override
     public String toString() {
-      return "{" +
-          "throughput=" + getThroughput() +
-          ", hit=" + hit +
-          ", lastHitTime=" + lastHitTime +
-          '}';
+      return "{"
+          + "throughput="
+          + getThroughput()
+          + ", hit="
+          + hit
+          + ", lastHitTime="
+          + lastHitTime
+          + '}';
     }
 
     public int getHit() {
@@ -88,9 +91,11 @@ public class LocationStatistics {
 
   public void logLocationStatistics() {
     if (logger.isInfoEnabled()) {
-      logger.info("Location throughput: {}", 
locationStatisticMap.entrySet().stream()
-          .map(e -> new Pair<>(e.getKey().getDataNodeId(), e.getValue()))
-          .collect(Collectors.toList()));
+      logger.info(
+          "Location throughput: {}",
+          locationStatisticMap.entrySet().stream()
+              .map(e -> new Pair<>(e.getKey().getDataNodeId(), e.getValue()))
+              .collect(Collectors.toList()));
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/RandomLocationSequencer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/RandomLocationSequencer.java
index 8af95667865..85233c59de8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/RandomLocationSequencer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/RandomLocationSequencer.java
@@ -19,16 +19,18 @@
 
 package org.apache.iotdb.db.queryengine.execution.load.locseq;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 
 public class RandomLocationSequencer implements LocationSequencer {
 
   private List<TDataNodeLocation> orderedLocations;
+
   public RandomLocationSequencer(TRegionReplicaSet replicaSet) {
     orderedLocations = new ArrayList<>(replicaSet.dataNodeLocations);
     Collections.shuffle(orderedLocations);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
index 48cf1fa2c1b..3045db725c3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
@@ -19,39 +19,43 @@
 
 package org.apache.iotdb.db.queryengine.execution.load.locseq;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Random;
-import java.util.stream.Collectors;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import 
org.apache.iotdb.db.queryengine.execution.load.locseq.LocationStatistics.Statistic;
 import org.apache.iotdb.tsfile.utils.Pair;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
 public class ThroughputBasedLocationSequencer implements LocationSequencer {
 
-  private static final Logger logger = LoggerFactory.getLogger(
-      ThroughputBasedLocationSequencer.class);
+  private static final Logger logger =
+      LoggerFactory.getLogger(ThroughputBasedLocationSequencer.class);
   private Random random = new Random();
   private long resampleThresholdMS = 10000000;
   private List<TDataNodeLocation> orderedLocations;
 
-  public ThroughputBasedLocationSequencer(TRegionReplicaSet replicaSet,
-      LocationStatistics locationStatistics) {
-    List<Pair<TDataNodeLocation, Double>> locationRanks = 
rankLocations(replicaSet,
-        locationStatistics);
+  public ThroughputBasedLocationSequencer(
+      TRegionReplicaSet replicaSet, LocationStatistics locationStatistics) {
+    List<Pair<TDataNodeLocation, Double>> locationRanks =
+        rankLocations(replicaSet, locationStatistics);
     orderedLocations = new ArrayList<>(locationRanks.size());
     while (!locationRanks.isEmpty()) {
       // the chosen location is removed from the list
       orderedLocations.add(chooseNextLocation(locationRanks).left);
     }
     if (logger.isDebugEnabled()) {
-      logger.debug("Location orders: {}",
-          
orderedLocations.stream().map(TDataNodeLocation::getDataNodeId).collect(
-              Collectors.toList()));
+      logger.debug(
+          "Location orders: {}",
+          orderedLocations.stream()
+              .map(TDataNodeLocation::getDataNodeId)
+              .collect(Collectors.toList()));
     }
   }
 
@@ -62,8 +66,8 @@ public class ThroughputBasedLocationSequencer implements 
LocationSequencer {
    * @param replicaSet replica set to be ranked
    * @return the nodes and their ranks
    */
-  private List<Pair<TDataNodeLocation, Double>> 
rankLocations(TRegionReplicaSet replicaSet,
-      LocationStatistics locationStatistics) {
+  private List<Pair<TDataNodeLocation, Double>> rankLocations(
+      TRegionReplicaSet replicaSet, LocationStatistics locationStatistics) {
     List<Pair<TDataNodeLocation, Double>> locations =
         new ArrayList<>(replicaSet.dataNodeLocations.size());
     // retrieve throughput of each node
@@ -84,10 +88,14 @@ public class ThroughputBasedLocationSequencer implements 
LocationSequencer {
       totalThroughput += throughput;
     }
     if (logger.isInfoEnabled()) {
-      logger.debug("Location throughput: {}",
-          locations.stream().map(p -> new Pair<>(p.left.getDataNodeId(), 
p.right)).collect(
-              Collectors.toList()));
-      logger.debug("Total throughput: {}, first rank {}", totalThroughput,
+      logger.debug(
+          "Location throughput: {}",
+          locations.stream()
+              .map(p -> new Pair<>(p.left.getDataNodeId(), p.right))
+              .collect(Collectors.toList()));
+      logger.debug(
+          "Total throughput: {}, first rank {}",
+          totalThroughput,
           locations.get(0).right / totalThroughput);
     }
 
@@ -98,9 +106,11 @@ public class ThroughputBasedLocationSequencer implements 
LocationSequencer {
       location.right = location.right / totalThroughput + locations.get(i - 
1).right;
     }
     if (logger.isInfoEnabled()) {
-      logger.debug("Location ranks: {}",
-          locations.stream().map(p -> new Pair<>(p.left.getDataNodeId(), 
p.right)).collect(
-              Collectors.toList()));
+      logger.debug(
+          "Location ranks: {}",
+          locations.stream()
+              .map(p -> new Pair<>(p.left.getDataNodeId(), p.right))
+              .collect(Collectors.toList()));
     }
     return locations;
   }
@@ -115,9 +125,13 @@ public class ThroughputBasedLocationSequencer implements 
LocationSequencer {
       }
     }
     if (chosen == 0 && locations.size() == 3 && logger.isDebugEnabled()) {
-      logger.debug("Dice {}, chosen {}, ranks {}", dice, chosen,
-          locations.stream().map(p -> new Pair<>(p.left.getDataNodeId(), 
p.right)).collect(
-              Collectors.toList()));
+      logger.debug(
+          "Dice {}, chosen {}, ranks {}",
+          dice,
+          chosen,
+          locations.stream()
+              .map(p -> new Pair<>(p.left.getDataNodeId(), p.right))
+              .collect(Collectors.toList()));
     }
     Pair<TDataNodeLocation, Double> chosenPair = locations.remove(chosen);
     // update ranks
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
index b8f8d0eeb30..7c53a6295b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
@@ -1,5 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.iotdb.db.queryengine.execution.load.nodesplit;
 
+import com.fasterxml.jackson.databind.ObjectReader;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.OptionalDouble;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import net.ricecode.similarity.DiceCoefficientStrategy;
+import net.ricecode.similarity.JaroStrategy;
+import net.ricecode.similarity.LevenshteinDistanceStrategy;
+import org.apache.iotdb.db.queryengine.execution.load.AlignedChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.NonAlignedChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+
+import net.ricecode.similarity.JaroWinklerStrategy;
+import net.ricecode.similarity.SimilarityStrategy;
+import net.ricecode.similarity.StringSimilarityService;
+import net.ricecode.similarity.StringSimilarityServiceImpl;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -12,58 +57,167 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
-import net.ricecode.similarity.JaroWinklerStrategy;
-import net.ricecode.similarity.SimilarityStrategy;
-import net.ricecode.similarity.StringSimilarityService;
-import net.ricecode.similarity.StringSimilarityServiceImpl;
-import org.apache.iotdb.db.queryengine.execution.load.AlignedChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.NonAlignedChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ClusteringMeasurementSplitter implements PieceNodeSplitter {
 
-  private int numCluster;
+  private static final Logger logger = 
LoggerFactory.getLogger(ClusteringMeasurementSplitter.class);
+  private double groupFactor;
   private int maxIteration;
-  private int dataSampleLength = 128;
-
-  public ClusteringMeasurementSplitter(int numCluster, int maxIteration) {
-    this.numCluster = numCluster;
+  private int dataSampleLength = 32;
+  private double stdErrThreshold = 1;
+  private long splitStartTime;
+  private long splitTimeBudget = 5;
+  private static final int MAX_CLUSTER_NUM = 500;
+
+  public ClusteringMeasurementSplitter(double groupFactor, int maxIteration) {
+    this.groupFactor = groupFactor;
     this.maxIteration = maxIteration;
   }
 
   @Override
   public List<LoadTsFilePieceNode> split(LoadTsFilePieceNode pieceNode) {
+    splitStartTime = System.currentTimeMillis();
     if (pieceNode.isHasModification()) {
-      // the order of modifications should be preserved, so with modifications 
clustering cannot be used
+      // the order of modifications should be preserved, so with modifications 
clustering cannot be
+      // used
       return new OrderedMeasurementSplitter().split(pieceNode);
     }
 
     // split by measurement first
-    Map<String, LoadTsFilePieceNode> measurementPieceNodeMap = new HashMap<>();
+    Map<String, LoadTsFilePieceNode> measurementPieceNodeMap = new HashMap<>(
+        pieceNode.getAllTsFileData().size());
     for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
       ChunkData chunkData = (ChunkData) tsFileData;
       String currMeasurement = chunkData.firstMeasurement();
-      LoadTsFilePieceNode pieceNodeSplit = 
measurementPieceNodeMap.computeIfAbsent(
-          currMeasurement,
-          m -> new LoadTsFilePieceNode(pieceNode.getPlanNodeId(), 
pieceNode.getTsFile()));
+      LoadTsFilePieceNode pieceNodeSplit =
+          measurementPieceNodeMap.computeIfAbsent(
+              currMeasurement,
+              m -> new LoadTsFilePieceNode(pieceNode.getPlanNodeId(), 
pieceNode.getTsFile()));
       pieceNodeSplit.addTsFileData(chunkData);
     }
     // use clustering to merge similar measurements
-    return clusterPieceNode(measurementPieceNodeMap);
+    List<LoadTsFilePieceNode> loadTsFilePieceNodes = 
clusterPieceNode(measurementPieceNodeMap);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Split distribution before refinement: {}",
+          loadTsFilePieceNodes.stream().map(l -> l.getAllTsFileData().size())
+              .collect(Collectors.toList()));
+      refineClusters(loadTsFilePieceNodes);
+      logger.debug("Split distribution after refinement: {}",
+          loadTsFilePieceNodes.stream().map(l -> l.getAllTsFileData().size())
+              .collect(Collectors.toList()));
+    }
+    return loadTsFilePieceNodes;
+  }
+
+  private void refineClusters(List<LoadTsFilePieceNode> pieceNodes) {
+    double average = 
pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).average()
+        .orElse(0.0);
+    double finalAverage1 = average;
+    double stderr = 
Math.sqrt(pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
+        .mapToDouble(s -> (s - finalAverage1) * (s - finalAverage1)).sum() / 
pieceNodes.size());
+    logger.debug("{} splits before refinement, average/stderr: {}/{}", 
pieceNodes.size(), average,
+        stderr);
+
+    while (stderr > average * stdErrThreshold
+        && System.currentTimeMillis() - splitStartTime < splitTimeBudget) {
+      
pieceNodes.sort(Comparator.comparingLong(LoadTsFilePieceNode::getDataSize));
+      LoadTsFilePieceNode smallestPiece = pieceNodes.get(0);
+      LoadTsFilePieceNode largestPiece = pieceNodes.get(pieceNodes.size() - 1);
+      double smallDiff = average - smallestPiece.getDataSize();
+      double largeDiff = largestPiece.getDataSize() - average;
+      if (smallDiff > largeDiff) {
+        mergeSmallPiece(pieceNodes);
+      } else {
+        if (!splitLargePiece(pieceNodes)) {
+          // the largest node may not be splittable (only one series), merge 
the smallest instead
+          mergeSmallPiece(pieceNodes);
+        }
+      }
+
+      average = 
pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).average()
+          .orElse(0.0);
+      double finalAverage = average;
+      stderr = 
Math.sqrt(pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
+          .mapToDouble(s -> (s - finalAverage) * (s - finalAverage)).sum() / 
pieceNodes.size());
+      logger.debug("{} splits, average/stderr: {}/{}", pieceNodes.size(), 
average, stderr);
+    }
+    logger.debug("{} splits after refinement, average/stderr: {}/{}", 
pieceNodes.size(), average,
+        stderr);
+  }
+
+  private void mergeSmallPiece(List<LoadTsFilePieceNode> pieceNodes) {
+    LoadTsFilePieceNode pieceA = pieceNodes.remove(0);
+    LoadTsFilePieceNode pieceB = pieceNodes.remove(0);
+    LoadTsFilePieceNode newPiece = new 
LoadTsFilePieceNode(pieceA.getPlanNodeId(),
+        pieceA.getTsFile());
+    pieceA.getAllTsFileData().forEach(newPiece::addTsFileData);
+    pieceB.getAllTsFileData().forEach(newPiece::addTsFileData);
+    pieceNodes.add(newPiece);
+    pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
+        .average().orElse(0.0);
+    pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
+        .sum();
+  }
+
+  private boolean splitLargePiece(List<LoadTsFilePieceNode> pieceNodes) {
+    LoadTsFilePieceNode oldPiece = pieceNodes.remove(pieceNodes.size() - 1);
+    LoadTsFilePieceNode newNodeA = new 
LoadTsFilePieceNode(oldPiece.getPlanNodeId(),
+        oldPiece.getTsFile());
+    LoadTsFilePieceNode newNodeB = new 
LoadTsFilePieceNode(oldPiece.getPlanNodeId(),
+        oldPiece.getTsFile());
+    long sizeTarget = oldPiece.getDataSize() / 2;
+
+    // cannot break a series into two pieces since the chunk order must be 
preserved
+    String currMeasurement = null;
+    List<TsFileData> allTsFileData = oldPiece.getAllTsFileData();
+    int i = 0;
+    for (; i < allTsFileData.size(); i++) {
+      TsFileData tsFileData = allTsFileData.get(i);
+      if (tsFileData.isModification()) {
+        // modifications follows previous chunk data
+        newNodeA.addTsFileData(tsFileData);
+      } else {
+        ChunkData chunkData = (ChunkData) tsFileData;
+        if (currMeasurement == null || 
currMeasurement.equals(chunkData.firstMeasurement())) {
+          // the first chunk or chunk of the same series, add it to A
+          currMeasurement = chunkData.firstMeasurement();
+          newNodeA.addTsFileData(tsFileData);
+          sizeTarget -= chunkData.getDataSize();
+        } else {
+          // chunk of the next series
+          if (sizeTarget < 0) {
+            // splitA is full, break to fill splitB
+            break;
+          } else {
+            // splitA is not full, also add this series to A
+            currMeasurement = chunkData.firstMeasurement();
+            newNodeA.addTsFileData(tsFileData);
+            sizeTarget -= chunkData.getDataSize();
+          }
+        }
+      }
+    }
+    // add remaining series to B
+    for (; i < allTsFileData.size(); i++) {
+      TsFileData tsFileData = allTsFileData.get(i);
+      newNodeB.addTsFileData(tsFileData);
+    }
+
+    pieceNodes.add(newNodeA);
+    if (!newNodeB.getAllTsFileData().isEmpty()) {
+      pieceNodes.add(newNodeB);
+      return true;
+    }
+    return false;
   }
 
   private List<LoadTsFilePieceNode> clusterPieceNode(
       Map<String, LoadTsFilePieceNode> measurementPieceNodeMap) {
     // convert to feature vector
-    Map<String, SeriesFeatureVector> measurementVectorMap = new HashMap<>(
-        measurementPieceNodeMap.size());
+    Map<String, SeriesFeatureVector> measurementVectorMap =
+        new HashMap<>(measurementPieceNodeMap.size());
     for (Entry<String, LoadTsFilePieceNode> entry : 
measurementPieceNodeMap.entrySet()) {
       measurementVectorMap.put(entry.getKey(), 
convertToFeature(entry.getValue()));
     }
@@ -74,6 +228,10 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
       doubleVectors.put(e.getKey(), e.getValue().numericVector);
     }
     // clustering
+    int numCluster = Math.min((int) (doubleVectors.size() / groupFactor), 
MAX_CLUSTER_NUM);
+    if (numCluster < 1) {
+      numCluster = 1;
+    }
     VectorDistance distance = new EuclideanDistance();
     Clustering clustering = new KMeans(numCluster, maxIteration);
     List<List<String>> clusterResult = clustering.cluster(doubleVectors, 
distance);
@@ -84,8 +242,8 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
         continue;
       }
       LoadTsFilePieceNode pieceNode = 
measurementPieceNodeMap.get(cluster.get(0));
-      LoadTsFilePieceNode clusterNode = new 
LoadTsFilePieceNode(pieceNode.getPlanNodeId(),
-          pieceNode.getTsFile());
+      LoadTsFilePieceNode clusterNode =
+          new LoadTsFilePieceNode(pieceNode.getPlanNodeId(), 
pieceNode.getTsFile());
       for (String measurementId : cluster) {
         pieceNode = measurementPieceNodeMap.get(measurementId);
         for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
@@ -128,22 +286,36 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
       maxNumOfPages = Math.max(maxNumOfPages, vector.numOfPages);
     }
 
-    SimilarityStrategy strategy = new JaroWinklerStrategy();
+    SimilarityStrategy strategy = new LevenshteinDistanceStrategy();
     StringSimilarityService service = new 
StringSimilarityServiceImpl(strategy);
-    for (SeriesFeatureVector vector : vectors) {
-      vector.numericVector[0] = service.score(firstMeasurementId, 
vector.measurementId);
-      vector.numericVector[1] = (vector.dataSize - minDataSize) * 1.0 / 
(maxDataSize - minDataSize);
+    int finalMinDataSize = minDataSize;
+    String finalFirstMeasurementId = firstMeasurementId;
+    int finalMaxDataSize = maxDataSize;
+    int finalMinDataType = minDataType;
+    int finalMaxDataType = maxDataType;
+    int finalMinCompressionType = minCompressionType;
+    int finalMaxCompressionType = maxCompressionType;
+    int finalMinEncodingType = minEncodingType;
+    int finalMaxEncodingType = maxEncodingType;
+    int finalMinNumOfPages = minNumOfPages;
+    int finalMaxNumOfPages = maxNumOfPages;
+    String finalFirstDataSample = firstDataSample;
+    vectors.stream().parallel().forEach(vector -> {
+      vector.numericVector[0] = service.score(finalFirstMeasurementId, 
vector.measurementId);
+      vector.numericVector[1] = (vector.dataSize - finalMinDataSize) * 1.0 / 
(finalMaxDataSize - finalMinDataSize);
       vector.numericVector[2] =
-          (vector.dataType.ordinal() - minDataType) * 1.0 / (maxDataType - 
minDataType);
+          (vector.dataType.ordinal() - finalMinDataType) * 1.0 / 
(finalMaxDataType - finalMinDataType);
       vector.numericVector[3] =
-          (vector.compressionType.ordinal() - minCompressionType) * 1.0 / 
(maxCompressionType
-              - minCompressionType);
+          (vector.compressionType.ordinal() - finalMinCompressionType)
+              * 1.0
+              / (finalMaxCompressionType - finalMinCompressionType);
       vector.numericVector[4] =
-          (vector.encodingType.ordinal() - minEncodingType) * 1.0 / 
(maxEncodingType
-              - minEncodingType);
+          (vector.encodingType.ordinal() - finalMinEncodingType)
+              * 1.0
+              / (finalMaxEncodingType - finalMinEncodingType);
       vector.numericVector[5] =
-          (vector.numOfPages - minNumOfPages) * 1.0 / (maxNumOfPages - 
minNumOfPages);
-      vector.numericVector[6] = service.score(firstDataSample, 
vector.dataSample);
+          (vector.numOfPages - finalMinNumOfPages) * 1.0 / (finalMaxNumOfPages 
- finalMinNumOfPages);
+      vector.numericVector[6] = service.score(finalFirstDataSample, 
vector.dataSample);
       double[] numericVector = vector.numericVector;
       for (int i = 0; i < numericVector.length; i++) {
         if (Double.isNaN(numericVector[i])) {
@@ -152,7 +324,7 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
           numericVector[i] = 1.0;
         }
       }
-    }
+    });
   }
 
   private SeriesFeatureVector convertToFeature(LoadTsFilePieceNode pieceNode) {
@@ -222,8 +394,11 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
       vector.compressionType = chunkHeader.getCompressionType();
       vector.encodingType = chunkHeader.getEncodingType();
       vector.numOfPages = chunkHeader.getNumOfPages();
-      vector.dataSample = new String(chunkBuffer.array(),
-          chunkBuffer.arrayOffset() + chunkBuffer.position(), 
chunkBuffer.remaining());
+      vector.dataSample =
+          new String(
+              chunkBuffer.array(),
+              chunkBuffer.arrayOffset() + chunkBuffer.position(),
+              chunkBuffer.remaining());
       return vector;
     }
 
@@ -252,9 +427,9 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
     public double calculate(double[] v1, double[] v2) {
       double sum = 0;
       for (int i = 0; i < v1.length; i++) {
-        sum += Math.pow(v1[i] - v2[i], 2);
+        sum += (v1[i] - v2[i]) * (v1[i] - v2[i]);
       }
-      return Math.sqrt(sum);
+      return Math.sqrt(sum) + 1;
     }
   }
 
@@ -263,11 +438,12 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
     List<List<String>> cluster(Map<String, double[]> tagVectorMap, 
VectorDistance distance);
   }
 
-  public static class KMeans implements Clustering {
+  public class KMeans implements Clustering {
 
     private int k;
     private int maxIteration;
     private double[][] centroids;
+    private AtomicInteger[] centroidCounters;
     private Random random;
     private Map<Entry<String, double[]>, Integer> recordCentroidMapping;
     private int vecLength = 0;
@@ -276,10 +452,20 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
       this.k = k;
       this.maxIteration = maxIteration;
       this.centroids = new double[k][];
+      this.centroidCounters = new AtomicInteger[k];
+      for (int i = 0; i < centroidCounters.length; i++) {
+        centroidCounters[i] = new AtomicInteger();
+      }
       this.random = new Random();
       this.recordCentroidMapping = new ConcurrentHashMap<>();
     }
 
+    private void clearCentroidCounter() {
+      for (AtomicInteger centroidCounter : centroidCounters) {
+        centroidCounter.set(0);
+      }
+    }
+
     @Override
     public List<List<String>> cluster(Map<String, double[]> tagVectorMap, 
VectorDistance distance) {
       recordCentroidMapping.clear();
@@ -299,70 +485,93 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
           break;
         }
         newCentroid();
+        clearCentroidCounter();
+        if (System.currentTimeMillis() - splitStartTime > splitTimeBudget) {
+          break;
+        }
       }
 
-      Map<Integer, List<Entry<String, double[]>>> centroidRecordMap = 
collectCentroidRecordMapping();
+      Map<Integer, List<Entry<String, double[]>>> centroidRecordMap =
+          collectCentroidRecordMapping();
       return centroidRecordMap.values().stream()
-          .map(l -> 
l.stream().map(Entry::getKey).collect(Collectors.toList())).collect(
-              Collectors.toList());
+          .map(l -> l.stream().map(Entry::getKey).collect(Collectors.toList()))
+          .collect(Collectors.toList());
     }
 
     private Map<Integer, List<Entry<String, double[]>>> 
collectCentroidRecordMapping() {
       Map<Integer, List<Entry<String, double[]>>> centroidRecordMapping = new 
ConcurrentHashMap<>();
-      recordCentroidMapping.entrySet().stream().parallel()
-          .forEach(e -> centroidRecordMapping.compute(e.getValue(), (key, 
oldV) -> {
-            if (oldV == null) {
-              oldV = new ArrayList<>();
-            }
-            oldV.add(e.getKey());
-            return oldV;
-          }));
+      recordCentroidMapping.entrySet().stream()
+          .parallel()
+          .forEach(
+              e ->
+                  centroidRecordMapping.compute(
+                      e.getValue(),
+                      (key, oldV) -> {
+                        if (oldV == null) {
+                          oldV = new ArrayList<>();
+                        }
+                        oldV.add(e.getKey());
+                        return oldV;
+                      }));
       return centroidRecordMapping;
     }
 
     private void newCentroid() {
-      Map<Integer, List<Entry<String, double[]>>> centroidRecordMapping = 
collectCentroidRecordMapping();
-      centroidRecordMapping.entrySet().stream().parallel().forEach(e -> {
-        Integer centroidId = e.getKey();
-        List<Entry<String, double[]>> records = e.getValue();
-        int recordNum = records.size();
-        double[] sumVec = new double[vecLength];
-        for (Entry<String, double[]> record : records) {
-          for (int i = 0; i < sumVec.length; i++) {
-            sumVec[i] += record.getValue()[i];
-          }
-        }
-        for (int i = 0; i < sumVec.length; i++) {
-          sumVec[i] = sumVec[i] / recordNum;
-        }
-        centroids[centroidId] = sumVec;
-      });
+      Map<Integer, List<Entry<String, double[]>>> centroidRecordMapping =
+          collectCentroidRecordMapping();
+      centroidRecordMapping.entrySet().stream()
+          .parallel()
+          .forEach(
+              e -> {
+                Integer centroidId = e.getKey();
+                List<Entry<String, double[]>> records = e.getValue();
+                int recordNum = records.size();
+                double[] sumVec = new double[vecLength];
+                for (Entry<String, double[]> record : records) {
+                  for (int i = 0; i < sumVec.length; i++) {
+                    sumVec[i] += record.getValue()[i];
+                  }
+                }
+                for (int i = 0; i < sumVec.length; i++) {
+                  sumVec[i] = sumVec[i] / recordNum;
+                }
+                centroids[centroidId] = sumVec;
+              });
     }
 
     private boolean assignCentroid(Map<String, double[]> tagVectorMap, 
VectorDistance distance) {
       AtomicBoolean centroidUpdated = new AtomicBoolean(false);
-      tagVectorMap.entrySet().stream().parallel().forEach(e -> {
-        double[] vector = e.getValue();
-        double minDist = Double.MAX_VALUE;
-        int nearestCentroid = 0;
-        for (int i = 0; i < centroids.length; i++) {
-          double dist = distance.calculate(vector, centroids[i]);
-          if (dist < minDist) {
-            minDist = dist;
-            nearestCentroid = i;
-          }
-        }
-
-        int finalNearestCentroid = nearestCentroid;
-        recordCentroidMapping.compute(e, (t, oc) -> {
-          if (oc == null || oc != finalNearestCentroid) {
-            centroidUpdated.set(true);
-            return finalNearestCentroid;
-          } else {
-            return oc;
-          }
-        });
-      });
+      tagVectorMap.entrySet().stream()
+          .parallel()
+          .forEach(
+              e -> {
+                double[] vector = e.getValue();
+                double minDist = Double.MAX_VALUE;
+                int nearestCentroid = 0;
+                for (int i = 0; i < centroids.length; i++) {
+                  double dist =
+                      distance.calculate(vector, centroids[i]) * (1
+                          + centroidCounters[i].get() * 0.1);
+                  if (dist < minDist) {
+                    minDist = dist;
+                    nearestCentroid = i;
+                  }
+                }
+
+                centroidCounters[nearestCentroid].incrementAndGet();
+                int finalNearestCentroid = nearestCentroid;
+                recordCentroidMapping.put(e, finalNearestCentroid);
+                recordCentroidMapping.compute(
+                    e,
+                    (t, oc) -> {
+                      if (oc == null || oc != finalNearestCentroid) {
+                        centroidUpdated.set(true);
+                        return finalNearestCentroid;
+                      } else {
+                        return oc;
+                      }
+                    });
+              });
       return centroidUpdated.get();
     }
 
@@ -372,7 +581,7 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
     }
 
     private void pickRandomCentroid(Map<String, double[]> tagVectorMap) {
-      List<double[]> recordVectors = 
tagVectorMap.values().stream().collect(Collectors.toList());
+      List<double[]> recordVectors = new ArrayList<>(tagVectorMap.values());
       Collections.shuffle(recordVectors);
       for (int i = 0; i < k; i++) {
         centroids[i] = recordVectors.get(i);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/OrderedMeasurementSplitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/OrderedMeasurementSplitter.java
index dc3b2526f91..9081e8f52de 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/OrderedMeasurementSplitter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/OrderedMeasurementSplitter.java
@@ -19,23 +19,26 @@
 
 package org.apache.iotdb.db.queryengine.execution.load.nodesplit;
 
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
 import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 
-public class OrderedMeasurementSplitter implements PieceNodeSplitter{
+import java.util.ArrayList;
+import java.util.List;
+
+public class OrderedMeasurementSplitter implements PieceNodeSplitter {
 
   /**
    * Split a piece node by series.
+   *
    * @return a list of piece nodes, each associated to only one series.
    */
   @Override
   public List<LoadTsFilePieceNode> split(LoadTsFilePieceNode pieceNode) {
     List<LoadTsFilePieceNode> result = new ArrayList<>();
     String currMeasurement = null;
-    LoadTsFilePieceNode currNode = new 
LoadTsFilePieceNode(pieceNode.getPlanNodeId(), pieceNode.getTsFile());
+    LoadTsFilePieceNode currNode =
+        new LoadTsFilePieceNode(pieceNode.getPlanNodeId(), 
pieceNode.getTsFile());
     result.add(currNode);
     for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
       if (tsFileData.isModification()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/PieceNodeSplitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/PieceNodeSplitter.java
index 6aebc82d7cd..efa45edfea6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/PieceNodeSplitter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/PieceNodeSplitter.java
@@ -19,9 +19,10 @@
 
 package org.apache.iotdb.db.queryengine.execution.load.nodesplit;
 
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+
 import java.util.Collections;
 import java.util.List;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 
 public interface PieceNodeSplitter {
   List<LoadTsFilePieceNode> split(LoadTsFilePieceNode pieceNode);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index d10130d9e2c..2eb3500eaa8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -36,7 +36,6 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo;
 import 
org.apache.iotdb.db.queryengine.execution.load.DataPartitionBatchFetcher;
 import org.apache.iotdb.db.queryengine.execution.load.TsFileDataManager;
 import org.apache.iotdb.db.queryengine.execution.load.TsFileSplitter;
-import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
index dbbbfea41bd..9f7a713b15d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
@@ -19,21 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.execution.load;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.stream.Collectors;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -57,40 +43,63 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
 import org.apache.iotdb.rpc.TSStatusCode;
+
 import org.junit.Test;
 
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
 public class LoadTsFileSchedulerTest extends TestBase {
 
   protected Map<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File, 
Set<Integer>>>>>
       phaseOneResults = new ConcurrentSkipListMap<>();
   // the third key is UUid, the value is command type
-  protected Map<TEndPoint, Map<String, Integer>> phaseTwoResults =
-      new ConcurrentSkipListMap<>();
+  protected Map<TEndPoint, Map<String, Integer>> phaseTwoResults = new 
ConcurrentSkipListMap<>();
 
   @Test
   public void test() {
     MPPQueryContext context = new MPPQueryContext(QueryId.MOCK_QUERY_ID);
 
     PlanFragmentId fragmentId = new 
PlanFragmentId("load_tsfile_scheduler_test", 0);
-    SubPlan subPlan = new SubPlan(
-        new PlanFragment(fragmentId, null));
+    SubPlan subPlan = new SubPlan(new PlanFragment(fragmentId, null));
     List<FragmentInstance> fragmentInstanceList = new ArrayList<>();
     for (int i = 0; i < tsFileResources.size(); i++) {
       TsFileResource tsFileResource = tsFileResources.get(i);
-      LoadSingleTsFileNode singleTsFileNode = new LoadSingleTsFileNode(
-          new PlanNodeId("load_tsfile_scheduler_test" + (i + 1)), 
tsFileResource, false);
-      fragmentInstanceList.add(new FragmentInstance(
-          new PlanFragment(fragmentId, singleTsFileNode),
-          new FragmentInstanceId(fragmentId, "" + i),
-          null, null, 0, null));
+      LoadSingleTsFileNode singleTsFileNode =
+          new LoadSingleTsFileNode(
+              new PlanNodeId("load_tsfile_scheduler_test" + (i + 1)), 
tsFileResource, false);
+      fragmentInstanceList.add(
+          new FragmentInstance(
+              new PlanFragment(fragmentId, singleTsFileNode),
+              new FragmentInstanceId(fragmentId, "" + i),
+              null,
+              null,
+              0,
+              null));
     }
-    DistributedQueryPlan queryPlan = new DistributedQueryPlan(context, 
subPlan, null,
-        fragmentInstanceList);
-
-    LoadTsFileScheduler scheduler = new LoadTsFileScheduler(queryPlan, context,
-        new QueryStateMachine(context.getQueryId(),
-            
IoTDBThreadPoolFactory.newCachedThreadPool("LoadTsFileSchedulerTest")),
-        internalServiceClientManager, dummyDataPartitionBatchFetcher(), false);
+    DistributedQueryPlan queryPlan =
+        new DistributedQueryPlan(context, subPlan, null, fragmentInstanceList);
+
+    LoadTsFileScheduler scheduler =
+        new LoadTsFileScheduler(
+            queryPlan,
+            context,
+            new QueryStateMachine(
+                context.getQueryId(),
+                
IoTDBThreadPoolFactory.newCachedThreadPool("LoadTsFileSchedulerTest")),
+            internalServiceClientManager,
+            dummyDataPartitionBatchFetcher(),
+            false);
 
     long start = System.currentTimeMillis();
     scheduler.start();
@@ -103,29 +112,33 @@ public class LoadTsFileSchedulerTest extends TestBase {
   public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint 
tEndpoint) {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
-    LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) 
PlanNodeType.deserialize(
-        req.body.slice());
+    LoadTsFilePieceNode pieceNode =
+        (LoadTsFilePieceNode) PlanNodeType.deserialize(req.body.slice());
     Set<Integer> splitIds =
         phaseOneResults
-            .computeIfAbsent(tEndpoint, e -> new ConcurrentSkipListMap<>(
-                Comparator.comparingInt(ConsensusGroupId::getId)))
+            .computeIfAbsent(
+                tEndpoint,
+                e -> new 
ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
             .computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
             .computeIfAbsent(req.uuid, id -> new ConcurrentSkipListMap<>())
             .computeIfAbsent(pieceNode.getTsFile(), f -> new 
ConcurrentSkipListSet<>());
-    
splitIds.addAll(pieceNode.getAllTsFileData().stream().map(TsFileData::getSplitId).collect(
-        Collectors.toList()));
+    splitIds.addAll(
+        pieceNode.getAllTsFileData().stream()
+            .map(TsFileData::getSplitId)
+            .collect(Collectors.toList()));
 
-    return new TLoadResp().setAccepted(true)
+    return new TLoadResp()
+        .setAccepted(true)
         .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
   public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint 
tEndpoint) {
     phaseTwoResults
-        .computeIfAbsent(tEndpoint,
-            e -> new ConcurrentSkipListMap<>())
+        .computeIfAbsent(tEndpoint, e -> new ConcurrentSkipListMap<>())
         .computeIfAbsent(req.uuid, id -> req.commandType);
 
-    return new TLoadResp().setAccepted(true)
+    return new TLoadResp()
+        .setAccepted(true)
         .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
@@ -134,8 +147,8 @@ public class LoadTsFileSchedulerTest extends TestBase {
     for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File, 
Set<Integer>>>>>
         tEndPointMapEntry : phaseOneResults.entrySet()) {
       TEndPoint endPoint = tEndPointMapEntry.getKey();
-      for (Entry<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>>
-          consensusGroupIdMapEntry : tEndPointMapEntry.getValue().entrySet()) {
+      for (Entry<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>> 
consensusGroupIdMapEntry :
+          tEndPointMapEntry.getValue().entrySet()) {
         ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
         int chunkNum = 0;
         int fileNum = 0;
@@ -150,27 +163,25 @@ public class LoadTsFileSchedulerTest extends TestBase {
           }
         }
         System.out.printf(
-            "%s - %s - %s tasks - %s files - %s chunks\n", endPoint, 
consensusGroupId, taskNum, fileNum, chunkNum);
-//        if (consensusGroupId.getId() == 0) {
-//          // d1, non-aligned series
-//          assertEquals(expectedChunkNum() / 2, chunkNum);
-//        } else {
-//          // d2, aligned series
-//          assertEquals(expectedChunkNum() / 2 / seriesNum, chunkNum);
-//        }
+            "%s - %s - %s tasks - %s files - %s chunks\n",
+            endPoint, consensusGroupId, taskNum, fileNum, chunkNum);
+        //        if (consensusGroupId.getId() == 0) {
+        //          // d1, non-aligned series
+        //          assertEquals(expectedChunkNum() / 2, chunkNum);
+        //        } else {
+        //          // d2, aligned series
+        //          assertEquals(expectedChunkNum() / 2 / seriesNum, chunkNum);
+        //        }
       }
     }
 
     System.out.print("Phase two:\n");
-    for (Entry<TEndPoint, Map<String, Integer>> tEndPointMapEntry :
-        phaseTwoResults.entrySet()) {
+    for (Entry<TEndPoint, Map<String, Integer>> tEndPointMapEntry : 
phaseTwoResults.entrySet()) {
       TEndPoint endPoint = tEndPointMapEntry.getKey();
-      for (Entry<String, Integer> stringMapEntry :
-          tEndPointMapEntry.getValue().entrySet()) {
+      for (Entry<String, Integer> stringMapEntry : 
tEndPointMapEntry.getValue().entrySet()) {
         String uuid = stringMapEntry.getKey();
         int command = stringMapEntry.getValue();
-        System.out.printf("%s - %s - %s\n", endPoint, uuid,
-            LoadCommand.values()[command]);
+        System.out.printf("%s - %s - %s\n", endPoint, uuid, 
LoadCommand.values()[command]);
         assertEquals(LoadCommand.EXECUTE.ordinal(), command);
       }
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
index 1d31c90ccc9..71efb778a47 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.queryengine.execution.load;
 
-import static org.junit.Assert.assertEquals;
-
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
@@ -33,6 +31,8 @@ import java.util.List;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertEquals;
+
 public class MergedTsFileSplitterTest extends TestBase {
 
   private List<TsFileData> resultSet = new ArrayList<>();
@@ -52,7 +52,8 @@ public class MergedTsFileSplitterTest extends TestBase {
                 new SynchronousQueue<>(),
                 new IoTThreadFactory("MergedTsFileSplitter"),
                 "MergedTsFileSplitter"),
-            TimePartitionUtils.getTimePartitionInterval());
+            TimePartitionUtils.getTimePartitionInterval(),
+            fileNum);
     try {
       splitter.splitTsFileByDataPartition();
       for (TsFileData tsFileData : resultSet) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
index aaa64b1e978..fab94c1751b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
@@ -19,16 +19,10 @@
 
 package org.apache.iotdb.db.queryengine.execution.load;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.GB;
+
+import java.util.Comparator;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -60,14 +54,19 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
 import org.apache.iotdb.tsfile.write.TsFileWriter;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -78,6 +77,17 @@ import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 public class TestBase {
 
   private static final Logger logger = LoggerFactory.getLogger(TestBase.class);
@@ -87,16 +97,21 @@ public class TestBase {
   public static final String TEST_TSFILE_PATH =
       BASE_OUTPUT_PATH + "testTsFile".concat(File.separator) + 
PARTIAL_PATH_STRING;
 
-  protected int fileNum = 500;
+  protected int fileNum = 100;
   // series number of each file, sn non-aligned series and 1 aligned series 
with sn measurements
   protected int seriesNum = 1000;
+  protected int deviceNum = 100;
   // number of chunks of each series in a file, each series has only one chunk 
in a file
-  protected double chunkTimeRangeRatio = 0.03;
+  protected double chunkTimeRangeRatio = 0.001;
   // the interval between two consecutive points of a series
   protected long pointInterval = 50_000;
-  protected List<DoubleSequenceGeneratorFactory> sequenceGeneratorFactories = 
Arrays.asList(
-      new SimpleDoubleSequenceGenerator.Factory(), new 
UniformDoubleSequenceGenerator.Factory(),
-      new GaussianDoubleSequenceGenerator.Factory());
+  protected List<Pair<String, DoubleSequenceGeneratorFactory>> 
measurementSequenceGeneratorPairs = Arrays.asList(
+      new Pair<>("Simple_", new SimpleDoubleSequenceGenerator.Factory()),
+      new Pair<>("UniformA_", new UniformDoubleSequenceGenerator.Factory(1.0)),
+      new Pair<>("GaussianA_", new 
GaussianDoubleSequenceGenerator.Factory(1.0, 1.0)),
+      new Pair<>("UniformB_", new 
UniformDoubleSequenceGenerator.Factory(10.0)),
+      new Pair<>("GaussianB_", new 
GaussianDoubleSequenceGenerator.Factory(15.0, 3.0))
+  );
   protected final List<File> files = new ArrayList<>();
   protected final List<TsFileResource> tsFileResources = new ArrayList<>();
   protected IPartitionFetcher partitionFetcher;
@@ -107,6 +122,8 @@ public class TestBase {
       internalServiceClientManager;
   // the third key is UUid, the forth key is targetFile
 
+  private int groupSizeInByte;
+
   @Before
   public void setup() throws IOException, WriteProcessException {
     setupFiles();
@@ -114,6 +131,8 @@ public class TestBase {
     partitionFetcher = dummyPartitionFetcher();
     setupPartitionTable();
     setupClientManager();
+    groupSizeInByte = 
TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte();
+    TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte((int) (GB));
   }
 
   @After
@@ -121,12 +140,14 @@ public class TestBase {
     for (File file : files) {
       file.delete();
     }
+    
TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(groupSizeInByte);
   }
 
   public int expectedChunkNum() {
     double totalTimeRange = chunkTimeRangeRatio * fileNum;
     int splitChunkNum = 0;
-    // if the boundary of the ith partition does not overlap a chunk, it 
introduces an additional split
+    // if the boundary of the ith partition does not overlap a chunk, it 
introduces an additional
+    // split
     // TODO: due to machine precision, the calculation may have error. Also, 
if the data amount is
     // too large, there could be more than one chunk for each series in the 
file.
     for (int i = 0; i <= totalTimeRange; i++) {
@@ -138,13 +159,15 @@ public class TestBase {
   }
 
   public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint 
tEndpoint)
-      throws TException {
-    return new TLoadResp().setAccepted(true)
+      throws TException, IOException {
+    return new TLoadResp()
+        .setAccepted(true)
         .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
   public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint 
tEndpoint) {
-    return new TLoadResp().setAccepted(true)
+    return new TLoadResp()
+        .setAccepted(true)
         .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
@@ -161,11 +184,14 @@ public class TestBase {
           public SyncDataNodeInternalServiceClient borrowClient(TEndPoint 
node) {
             try {
               return new SyncDataNodeInternalServiceClient(
-                  dummyProtocol(),
-                  new ThriftClientProperty.Builder().build(), node, this) {
+                  dummyProtocol(), new ThriftClientProperty.Builder().build(), 
node, this) {
                 @Override
                 public TLoadResp sendTsFilePieceNode(TTsFilePieceReq req) 
throws TException {
-                  return handleTsFilePieceNode(req, getTEndpoint());
+                  try {
+                    return handleTsFilePieceNode(req, getTEndpoint());
+                  } catch (IOException e) {
+                    throw new TException(e);
+                  }
                 }
 
                 @Override
@@ -193,8 +219,7 @@ public class TestBase {
   }
 
   public void setupPartitionTable() {
-    ConsensusGroupId d1GroupId = 
Factory.create(TConsensusGroupType.DataRegion.getValue(),
-        0);
+    ConsensusGroupId d1GroupId = 
Factory.create(TConsensusGroupType.DataRegion.getValue(), 0);
     TRegionReplicaSet d1Replicas =
         new TRegionReplicaSet(
             d1GroupId.convertToTConsensusGroupId(),
@@ -208,9 +233,11 @@ public class TestBase {
                 new TDataNodeLocation()
                     .setDataNodeId(2)
                     .setInternalEndPoint(new TEndPoint("localhost", 10002))));
-    partitionTable.put("d1", d1Replicas);
-    groupId2ReplicaSetMap.put(d1GroupId, d1Replicas);
+    for (int i = 0; i < deviceNum; i++) {
+      partitionTable.put("d" + i, d1Replicas);
+    }
 
+    groupId2ReplicaSetMap.put(d1GroupId, d1Replicas);
     ConsensusGroupId d2GroupId = 
Factory.create(TConsensusGroupType.DataRegion.getValue(), 1);
     TRegionReplicaSet d2Replicas =
         new TRegionReplicaSet(
@@ -225,7 +252,7 @@ public class TestBase {
                 new TDataNodeLocation()
                     .setDataNodeId(5)
                     .setInternalEndPoint(new TEndPoint("localhost", 10005))));
-    partitionTable.put("d2", d2Replicas);
+    partitionTable.put("dd1", d2Replicas);
     groupId2ReplicaSetMap.put(d2GroupId, d2Replicas);
   }
 
@@ -287,15 +314,28 @@ public class TestBase {
       @Override
       public List<TRegionReplicaSet> queryDataPartition(
           List<Pair<String, TTimePartitionSlot>> slotList) {
-        return slotList.stream()
-            .map(p -> partitionTable.get(p.left))
-            .collect(Collectors.toList());
+        return slotList.stream().map(p -> 
partitionTable.get(p.left)).collect(Collectors.toList());
       }
     };
   }
 
   public void setupFiles() {
-
+    
measurementSequenceGeneratorPairs.sort(Comparator.comparing(Pair::getLeft));
+    List<Pair<MeasurementSchema, DoubleSequenceGeneratorFactory>> 
schemaGeneratorPairs = new ArrayList<>();
+    for (int sn = 0; sn < seriesNum; sn++) {
+      Pair<String, DoubleSequenceGeneratorFactory> measurementGeneratorPair = 
measurementSequenceGeneratorPairs.get(
+          sn % measurementSequenceGeneratorPairs.size());
+      MeasurementSchema measurementSchema =
+          new MeasurementSchema(measurementGeneratorPair
+              .left + sn,
+              TSDataType.DOUBLE);
+      measurementSchema.setCompressor(CompressionType.LZ4.serialize());
+      measurementSchema.setEncoding(TSEncoding.PLAIN.serialize());
+      schemaGeneratorPairs.add(new Pair<>(measurementSchema, 
measurementGeneratorPair.right));
+    }
+    schemaGeneratorPairs.sort(Comparator.comparing(s -> 
s.left.getMeasurementId()));
+    List<MeasurementSchema> measurementSchemas = 
schemaGeneratorPairs.stream().map(m -> m.left).collect(
+        Collectors.toList());
     IntStream.range(0, fileNum)
         .parallel()
         .forEach(
@@ -309,45 +349,47 @@ public class TestBase {
 
                 try (TsFileWriter writer = new TsFileWriter(file)) {
                   // sn non-aligned series under d1 and 1 aligned series with 
sn measurements under
-                  // d2
-                  List<MeasurementSchema> measurementSchemas = new 
ArrayList<>();
+                  // dd2
                   for (int sn = 0; sn < seriesNum; sn++) {
-                    MeasurementSchema measurementSchema = new 
MeasurementSchema("s" + sn,
-                        TSDataType.DOUBLE);
-                    writer.registerTimeseries(
-                        new Path("d1"), measurementSchema);
-                    measurementSchemas.add(measurementSchema);
+                    for (int dn = 0; dn < deviceNum; dn++) {
+                      writer.registerTimeseries(new Path("d"+dn), 
schemaGeneratorPairs.get(sn).left);
+                    }
                   }
-                  writer.registerAlignedTimeseries(new Path("d2"), 
measurementSchemas);
+                  writer.registerAlignedTimeseries(new Path("dd1"), 
measurementSchemas);
 
                   // one chunk for each series
                   long timePartitionInterval = 
TimePartitionUtils.getTimePartitionInterval();
                   long chunkTimeRange = (long) (timePartitionInterval * 
chunkTimeRangeRatio);
                   int chunkPointNum = (int) (chunkTimeRange / pointInterval);
 
-                  Tablet tablet = new Tablet("d1", measurementSchemas, 
chunkPointNum);
+                  Tablet tablet = new Tablet("d0", measurementSchemas, 
chunkPointNum);
                   for (int sn = 0; sn < seriesNum; sn++) {
-                    DoubleSequenceGenerator sequenceGenerator = 
sequenceGeneratorFactories.get(
-                        sn % sequenceGeneratorFactories.size()).create();
+                    DoubleSequenceGenerator sequenceGenerator =
+                        schemaGeneratorPairs.get(sn).right
+                            .create();
                     for (int pn = 0; pn < chunkPointNum; pn++) {
                       if (sn == 0) {
                         long currTime = chunkTimeRange * i + pointInterval * 
pn;
                         tablet.addTimestamp(pn, currTime);
                       }
-                      tablet.addValue("s" + sn, pn, sequenceGenerator.gen(pn));
+                      
tablet.addValue(schemaGeneratorPairs.get(sn).left.getMeasurementId(), pn,
+                          sequenceGenerator.gen(pn));
                     }
                   }
 
                   tablet.rowSize = chunkPointNum;
-                  writer.write(tablet);
+                  for (int dn = 0; dn < deviceNum; dn++) {
+                    tablet.deviceId = "d" + dn;
+                    writer.write(tablet);
+                  }
                   tablet.deviceId = "d2";
-                  //writer.writeAligned(tablet);
+                  // writer.writeAligned(tablet);
 
                   writer.flushAllChunkGroups();
-                  tsFileResource.updateStartTime("d1", chunkTimeRange * i);
-                  tsFileResource.updateStartTime("d2", chunkTimeRange * i);
-                  tsFileResource.updateEndTime("d1", chunkTimeRange * (i + 1));
-                  tsFileResource.updateEndTime("d2", chunkTimeRange * (i + 1));
+                  for (int dn = 0; dn < deviceNum; dn++) {
+                    tsFileResource.updateStartTime("d"+dn, chunkTimeRange * i);
+                    tsFileResource.updateEndTime("d"+dn, chunkTimeRange * (i + 
1));
+                  }
                 }
 
                 tsFileResource.close();
@@ -358,6 +400,7 @@ public class TestBase {
                 throw new RuntimeException(e);
               }
             });
+    tsFileResources.sort(Comparator.comparingLong(TsFileResource::getVersion));
   }
 
   public static String getTestTsFilePath(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
index 819daa9d66b..818e3f80d25 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
@@ -19,19 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.execution.load;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Comparator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -47,12 +35,31 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.utils.Pair;
+
 import org.apache.thrift.TException;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
+import static org.junit.Assert.assertEquals;
+
 public class TsFileSplitSenderTest extends TestBase {
 
   private static final Logger logger = 
LoggerFactory.getLogger(TsFileSplitSenderTest.class);
@@ -69,18 +76,38 @@ public class TsFileSplitSenderTest extends TestBase {
   // simulating jvm stall like GC
   private long minStuckIntervalMS = 50000;
   private long maxStuckIntervalMS = 100000;
-  private long stuckDurationMS = 10000;
+  private long stuckDurationMS = 0;
 
-  private long nodeThroughput = 500000;
+  private long nodeThroughput = 10_000;
 
   protected Map<TEndPoint, Pair<Long, Long>> nextStuckTimeMap = new 
ConcurrentHashMap<>();
+  private AtomicLong sumHandleTime = new AtomicLong();
+  private AtomicLong decompressTime = new AtomicLong();
+  private AtomicLong deserializeTime = new AtomicLong();
+  private AtomicLong relayTime = new AtomicLong();
+  private AtomicLong maxMemoryUsage = new AtomicLong();
 
   @Test
   public void test() throws IOException {
+    Thread thread = new Thread(() -> {
+      while (!Thread.interrupted()) {
+        long preUsage = maxMemoryUsage.get();
+        long newUsage = Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory();
+        if (preUsage < newUsage) {
+          maxMemoryUsage.set(newUsage);
+        }
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    });
+    thread.start();
+
     LoadTsFileNode loadTsFileNode =
         new LoadTsFileNode(new PlanNodeId("testPlanNode"), tsFileResources);
-    DataPartitionBatchFetcher partitionBatchFetcher =
-        dummyDataPartitionBatchFetcher();
+    DataPartitionBatchFetcher partitionBatchFetcher = 
dummyDataPartitionBatchFetcher();
     TsFileSplitSender splitSender =
         new TsFileSplitSender(
             loadTsFileNode,
@@ -88,30 +115,44 @@ public class TsFileSplitSenderTest extends TestBase {
             TimePartitionUtils.getTimePartitionInterval(),
             internalServiceClientManager,
             false,
-            maxSplitSize);
+            maxSplitSize,
+            100);
     long start = System.currentTimeMillis();
     splitSender.start();
     long timeConsumption = System.currentTimeMillis() - start;
 
     printPhaseResult();
-    System.out.printf("Split ends after %dms", timeConsumption);
+    long transmissionTime = splitSender.getStatistic().compressedSize.get() / 
nodeThroughput;
+    System.out.printf("Split ends after %dms + %dms (Transmission) = %dms\n", 
timeConsumption,
+        transmissionTime, timeConsumption + transmissionTime);
+    System.out.printf("Handle sum %dns\n", sumHandleTime.get());
+    System.out.printf("Decompress sum %dns\n", decompressTime.get());
+    System.out.printf("Deserialize sum %dns\n", deserializeTime.get());
+    System.out.printf("Relay sum %dns\n", relayTime.get());
+    System.out.printf("Memory usage %dMB\n", maxMemoryUsage.get() / MB);
   }
 
   public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint 
tEndpoint)
-      throws TException {
-    if ((tEndpoint.getPort() - 10000) % 3 == 0 && random.nextDouble() < 
packetLossRatio
+      throws TException, IOException {
+    long handleStart = System.nanoTime();
+    if ((tEndpoint.getPort() - 10000) % 3 == 0
+        && random.nextDouble() < packetLossRatio
         && req.isRelay) {
       throw new TException("Packet lost");
     }
-    if ((tEndpoint.getPort() - 10000) % 3 == 1 && random.nextDouble() < 
packetLossRatio / 2
+    if ((tEndpoint.getPort() - 10000) % 3 == 1
+        && random.nextDouble() < packetLossRatio / 2
         && req.isRelay) {
       throw new TException("Packet lost");
     }
 
-    if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay) {
-      Pair<Long, Long> nextStuckTime = 
nextStuckTimeMap.computeIfAbsent(tEndpoint,
-          e -> new Pair<>(System.currentTimeMillis(),
-              System.currentTimeMillis() + stuckDurationMS));
+    if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay && 
stuckDurationMS > 0) {
+      Pair<Long, Long> nextStuckTime =
+          nextStuckTimeMap.computeIfAbsent(
+              tEndpoint,
+              e ->
+                  new Pair<>(
+                      System.currentTimeMillis(), System.currentTimeMillis() + 
stuckDurationMS));
       long currTime = System.currentTimeMillis();
       if (currTime >= nextStuckTime.left && currTime < nextStuckTime.right) {
         logger.debug("Node{} stalls", tEndpoint.getPort() - 10000);
@@ -121,38 +162,52 @@ public class TsFileSplitSenderTest extends TestBase {
           throw new RuntimeException(e);
         }
       } else if (currTime > nextStuckTime.right) {
-        nextStuckTimeMap.compute(tEndpoint, (endPoint, newInterval) -> {
-          if (newInterval != null && currTime < newInterval.right) {
-            return newInterval;
-          }
-          long start = currTime + minStuckIntervalMS + random.nextInt(
-              (int) (maxStuckIntervalMS - minStuckIntervalMS));
-          return new Pair<>(start, start + stuckDurationMS);
-        });
+        nextStuckTimeMap.compute(
+            tEndpoint,
+            (endPoint, newInterval) -> {
+              if (newInterval != null && currTime < newInterval.right) {
+                return newInterval;
+              }
+              long start =
+                  currTime
+                      + minStuckIntervalMS
+                      + random.nextInt((int) (maxStuckIntervalMS - 
minStuckIntervalMS));
+              return new Pair<>(start, start + stuckDurationMS);
+            });
       }
     }
 
+    long decompressStart = System.nanoTime();
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
-    LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) 
PlanNodeType.deserialize(
-        req.body.slice());
+    ByteBuffer buf = req.body.slice();
+    if (req.isSetCompressionType()) {
+      CompressionType compressionType = 
CompressionType.deserialize(req.compressionType);
+      IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(compressionType);
+      int uncompressedLength = req.getUncompressedLength();
+      ByteBuffer allocate = ByteBuffer.allocate(uncompressedLength);
+      unCompressor.uncompress(buf.array(), buf.arrayOffset() + buf.position(), 
buf.remaining(),
+          allocate.array(), 0);
+      allocate.limit(uncompressedLength);
+      buf = allocate;
+    }
+    decompressTime.addAndGet(System.nanoTime() - decompressStart);
+
+    long deserializeStart = System.nanoTime();
+    LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) 
PlanNodeType.deserialize(buf);
+    deserializeTime.addAndGet(System.nanoTime() - deserializeStart);
     Set<Integer> splitIds =
         phaseOneResults
-            .computeIfAbsent(tEndpoint, e -> new ConcurrentSkipListMap<>(
-                Comparator.comparingInt(ConsensusGroupId::getId)))
+            .computeIfAbsent(
+                tEndpoint,
+                e -> new 
ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
             .computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
             .computeIfAbsent(req.uuid, id -> new ConcurrentSkipListMap<>())
             .computeIfAbsent(pieceNode.getTsFile(), f -> new 
ConcurrentSkipListSet<>());
-    
splitIds.addAll(pieceNode.getAllTsFileData().stream().map(TsFileData::getSplitId).collect(
-        Collectors.toList()));
-
-    synchronized (tEndpoint) {
-      try {
-        Thread.sleep(pieceNode.getDataSize() / nodeThroughput);
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
+    splitIds.addAll(
+        pieceNode.getAllTsFileData().stream()
+            .map(TsFileData::getSplitId)
+            .collect(Collectors.toList()));
 
     if (dummyDelayMS > 0) {
       if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay) {
@@ -173,17 +228,25 @@ public class TsFileSplitSenderTest extends TestBase {
 
     // forward to other replicas in the group
     if (req.isRelay) {
+      long relayStart = System.nanoTime();
       req.isRelay = false;
       TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
-      for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
+      
regionReplicaSet.getDataNodeLocations().stream().parallel().forEach(dataNodeLocation
 -> {
         TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
         if (!otherPoint.equals(tEndpoint)) {
-          handleTsFilePieceNode(req, otherPoint);
+          try {
+            handleTsFilePieceNode(req, otherPoint);
+          } catch (TException | IOException e) {
+            throw new RuntimeException(e);
+          }
         }
-      }
+      });
+      relayTime.addAndGet(System.nanoTime() - relayStart);
     }
 
-    return new TLoadResp().setAccepted(true)
+    sumHandleTime.addAndGet(System.nanoTime() - handleStart);
+    return new TLoadResp()
+        .setAccepted(true)
         .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
@@ -191,7 +254,8 @@ public class TsFileSplitSenderTest extends TestBase {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
     phaseTwoResults
-        .computeIfAbsent(tEndpoint,
+        .computeIfAbsent(
+            tEndpoint,
             e -> new 
ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
         .computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
         .computeIfAbsent(req.uuid, id -> req.commandType);
@@ -208,7 +272,8 @@ public class TsFileSplitSenderTest extends TestBase {
       }
     }
 
-    return new TLoadResp().setAccepted(true)
+    return new TLoadResp()
+        .setAccepted(true)
         .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
@@ -217,8 +282,8 @@ public class TsFileSplitSenderTest extends TestBase {
     for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File, 
Set<Integer>>>>>
         tEndPointMapEntry : phaseOneResults.entrySet()) {
       TEndPoint endPoint = tEndPointMapEntry.getKey();
-      for (Entry<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>>
-          consensusGroupIdMapEntry : tEndPointMapEntry.getValue().entrySet()) {
+      for (Entry<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>> 
consensusGroupIdMapEntry :
+          tEndPointMapEntry.getValue().entrySet()) {
         ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
         for (Entry<String, Map<File, Set<Integer>>> stringMapEntry :
             consensusGroupIdMapEntry.getValue().entrySet()) {
@@ -227,15 +292,15 @@ public class TsFileSplitSenderTest extends TestBase {
             File tsFile = fileListEntry.getKey();
             Set<Integer> chunks = fileListEntry.getValue();
             System.out.printf(
-                "%s - %s - %s - %s - %s chunks\n", endPoint, consensusGroupId, 
uuid, tsFile,
-                chunks.size());
-//            if (consensusGroupId.getId() == 0) {
-//              // d1, non-aligned series
-//              assertEquals(expectedChunkNum() / 2, chunks.size());
-//            } else {
-//              // d2, aligned series
-//              assertEquals(expectedChunkNum() / 2 / seriesNum, 
chunks.size());
-//            }
+                "%s - %s - %s - %s - %s chunks\n",
+                endPoint, consensusGroupId, uuid, tsFile, chunks.size());
+            //            if (consensusGroupId.getId() == 0) {
+            //              // d1, non-aligned series
+            //              assertEquals(expectedChunkNum() / 2, 
chunks.size());
+            //            } else {
+            //              // d2, aligned series
+            //              assertEquals(expectedChunkNum() / 2 / seriesNum, 
chunks.size());
+            //            }
           }
         }
       }
@@ -252,8 +317,9 @@ public class TsFileSplitSenderTest extends TestBase {
             consensusGroupIdMapEntry.getValue().entrySet()) {
           String uuid = stringMapEntry.getKey();
           int command = stringMapEntry.getValue();
-          System.out.printf("%s - %s - %s - %s\n", endPoint, consensusGroupId, 
uuid,
-              LoadCommand.values()[command]);
+          System.out.printf(
+              "%s - %s - %s - %s\n",
+              endPoint, consensusGroupId, uuid, LoadCommand.values()[command]);
           assertEquals(LoadCommand.EXECUTE.ordinal(), command);
         }
       }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
index 62b5995b61b..0d307da377e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.queryengine.execution.load;
 
-import static org.junit.Assert.assertEquals;
-
 import org.junit.Test;
 
 import java.io.File;
@@ -28,6 +26,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+
 public class TsFileSplitterTest extends TestBase {
   private List<TsFileData> resultSet = new ArrayList<>();
 
@@ -44,7 +44,8 @@ public class TsFileSplitterTest extends TestBase {
       // System.out.println(tsFileData);
     }
     System.out.printf(
-        "%d/%d splits after %dms\n", resultSet.size(), expectedChunkNum(), 
System.currentTimeMillis() - start);
+        "%d/%d splits after %dms\n",
+        resultSet.size(), expectedChunkNum(), System.currentTimeMillis() - 
start);
     assertEquals(resultSet.size(), expectedChunkNum());
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/SequenceUtils.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/SequenceUtils.java
index 494737f0446..9adc5db6202 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/SequenceUtils.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/SequenceUtils.java
@@ -36,7 +36,7 @@ public class SequenceUtils {
       return id * 1.0;
     }
 
-    public static class Factory implements DoubleSequenceGeneratorFactory{
+    public static class Factory implements DoubleSequenceGeneratorFactory {
       @Override
       public DoubleSequenceGenerator create() {
         return new SimpleDoubleSequenceGenerator();
@@ -46,32 +46,60 @@ public class SequenceUtils {
 
   public static class UniformDoubleSequenceGenerator implements 
DoubleSequenceGenerator {
 
+    private double bound;
     private Random random = new Random();
+
+    public UniformDoubleSequenceGenerator(double bound) {
+      this.bound = bound;
+    }
+
     @Override
     public double gen(int id) {
-      return random.nextDouble();
+      return random.nextDouble() * bound;
     }
 
-    public static class Factory implements DoubleSequenceGeneratorFactory{
+    public static class Factory implements DoubleSequenceGeneratorFactory {
+      private double bound;
+
+      public Factory(double bound) {
+        this.bound = bound;
+      }
+
       @Override
       public DoubleSequenceGenerator create() {
-        return new UniformDoubleSequenceGenerator();
+        return new UniformDoubleSequenceGenerator(bound);
       }
     }
   }
 
   public static class GaussianDoubleSequenceGenerator implements 
DoubleSequenceGenerator {
 
+    private double mean;
+    private double stderr;
     private Random random = new Random();
+
+    public GaussianDoubleSequenceGenerator(double mean, double stderr) {
+      this.mean = mean;
+      this.stderr = stderr;
+    }
+
     @Override
     public double gen(int id) {
-      return random.nextGaussian();
+      return mean + random.nextGaussian() * stderr;
     }
 
-    public static class Factory implements DoubleSequenceGeneratorFactory{
+    public static class Factory implements DoubleSequenceGeneratorFactory {
+      private double mean;
+      private double stderr;
+
+      public Factory(double mean, double stderr) {
+        this.mean = mean;
+        this.stderr = stderr;
+      }
+
       @Override
       public DoubleSequenceGenerator create() {
-        return new GaussianDoubleSequenceGenerator();
+        return new GaussianDoubleSequenceGenerator(mean, stderr);
       }
     }
   }
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 0cd4313e43f..e2868734c67 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -310,6 +310,8 @@ struct TTsFilePieceReq{
     3: required common.TConsensusGroupId consensusGroupId
     // if isRelay is true, the receiver should forward the request to other 
replicas in the group
     4: optional bool isRelay
+    5: optional i8 compressionType
+    6: optional i32 uncompressedLength
 }
 
 struct TLoadCommandReq{

Reply via email to