This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/load_v2 by this push:
     new e852ea367d5 add TsFileSplitSender
e852ea367d5 is described below

commit e852ea367d5aa139016409592038a439f427ffae
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Sep 6 11:28:50 2023 +0800

    add TsFileSplitSender
---
 .../execution/load/AlignedChunkData.java           |  16 ++
 .../execution/load/DataPartitionBatchFetcher.java  |  70 +++++
 .../queryengine/execution/load/DeletionData.java   |  11 +
 .../execution/load/MergedTsFileSplitter.java       | 105 ++++---
 .../execution/load/NonAlignedChunkData.java        |  16 ++
 .../db/queryengine/execution/load/TsFileData.java  |   4 +
 .../execution/load/TsFileDataManager.java          | 162 +++++++++++
 .../execution/load/TsFileSplitSender.java          | 296 +++++++++++++++++++
 .../queryengine/execution/load/TsFileSplitter.java |  18 +-
 .../planner/plan/node/load/LoadTsFileNode.java     |   8 +
 .../scheduler/load/LoadTsFileDispatcherImpl.java   |   2 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 185 ++----------
 .../apache/iotdb/db/utils/TimePartitionUtils.java  |   6 +
 .../execution/load/MergedTsFileSplitterTest.java   |  35 ++-
 .../db/queryengine/execution/load/TestBase.java    | 317 ++++++++++++++++++---
 .../execution/load/TsFileSplitSenderTest.java      | 186 ++++++++++++
 .../execution/load/TsFileSplitterTest.java         |  15 +-
 .../apache/iotdb/commons/client/ClientManager.java |   2 +-
 .../sync/SyncDataNodeInternalServiceClient.java    |  15 +
 .../src/main/thrift/datanode.thrift                |   4 +
 20 files changed, 1200 insertions(+), 273 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
index 6fb25fec0c7..630ba5f2fcb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
@@ -72,6 +72,7 @@ public class AlignedChunkData implements ChunkData {
 
   private AlignedChunkWriterImpl chunkWriter;
   private List<Chunk> chunkList;
+  private int splitId;
 
   public AlignedChunkData(
       String device, ChunkHeader chunkHeader, TTimePartitionSlot 
timePartitionSlot) {
@@ -151,6 +152,7 @@ public class AlignedChunkData implements ChunkData {
     ReadWriteIOUtils.write(isAligned(), stream);
     serializeAttr(stream);
     byteStream.writeTo(stream);
+    ReadWriteIOUtils.write(splitId, stream);
     close();
   }
 
@@ -426,6 +428,8 @@ public class AlignedChunkData implements ChunkData {
     chunkData.pageNumbers = pageNumbers;
     chunkData.deserializeTsFileData(stream);
     chunkData.close();
+
+    chunkData.setSplitId(ReadWriteIOUtils.readInt(stream));
     return chunkData;
   }
 
@@ -453,6 +457,18 @@ public class AlignedChunkData implements ChunkData {
         + dataSize
         + ", needDecodeChunk="
         + needDecodeChunk
+        + ", splitId="
+        + splitId
         + '}';
   }
+
+  @Override
+  public int getSplitId() {
+    return splitId;
+  }
+
+  @Override
+  public void setSplitId(int sid) {
+    this.splitId = sid;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DataPartitionBatchFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DataPartitionBatchFetcher.java
new file mode 100644
index 00000000000..f969a2746a6
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DataPartitionBatchFetcher.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class DataPartitionBatchFetcher {
+
+  private final IPartitionFetcher fetcher;
+
+  public DataPartitionBatchFetcher(IPartitionFetcher fetcher) {
+    this.fetcher = fetcher;
+  }
+
+  public List<TRegionReplicaSet> queryDataPartition(
+      List<Pair<String, TTimePartitionSlot>> slotList) {
+    List<TRegionReplicaSet> replicaSets = new ArrayList<>();
+    int size = slotList.size();
+
+    for (int i = 0; i < size; i += LoadTsFileScheduler.TRANSMIT_LIMIT) {
+      List<Pair<String, TTimePartitionSlot>> subSlotList =
+          slotList.subList(i, Math.min(size, i + 
LoadTsFileScheduler.TRANSMIT_LIMIT));
+      DataPartition dataPartition = 
fetcher.getOrCreateDataPartition(toQueryParam(subSlotList));
+      replicaSets.addAll(
+          subSlotList.stream()
+              .map(pair -> 
dataPartition.getDataRegionReplicaSetForWriting(pair.left, pair.right))
+              .collect(Collectors.toList()));
+    }
+    return replicaSets;
+  }
+
+  private List<DataPartitionQueryParam> toQueryParam(List<Pair<String, 
TTimePartitionSlot>> slots) {
+    return slots.stream()
+        .collect(
+            Collectors.groupingBy(
+                Pair::getLeft, Collectors.mapping(Pair::getRight, 
Collectors.toSet())))
+        .entrySet()
+        .stream()
+        .map(
+            entry -> new DataPartitionQueryParam(entry.getKey(), new 
ArrayList<>(entry.getValue())))
+        .collect(Collectors.toList());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
index 3f486af4adb..973cd73e5a3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
@@ -33,6 +33,7 @@ import java.io.InputStream;
 
 public class DeletionData implements TsFileData {
   private final Deletion deletion;
+  private int splitId;
 
   public DeletionData(Deletion deletion) {
     this.deletion = deletion;
@@ -69,4 +70,14 @@ public class DeletionData implements TsFileData {
       throws IllegalPathException, IOException {
     return new DeletionData(Deletion.deserializeWithoutFileOffset(new 
DataInputStream(stream)));
   }
+
+  @Override
+  public int getSplitId() {
+    return splitId;
+  }
+
+  @Override
+  public void setSplitId(int sid) {
+    this.splitId = sid;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
index 1a969ebe766..c5d4700823d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
@@ -19,18 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.execution.load;
 
-import java.io.DataOutputStream;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -55,11 +44,12 @@ import 
org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
 import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
-
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -73,6 +63,13 @@ import java.util.NoSuchElementException;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
 public class MergedTsFileSplitter {
@@ -83,16 +80,22 @@ public class MergedTsFileSplitter {
   private final Function<TsFileData, Boolean> consumer;
   private final PriorityQueue<SplitTask> taskPriorityQueue;
   private ExecutorService asyncExecutor;
-
-  public MergedTsFileSplitter(List<File> tsFiles, Function<TsFileData, 
Boolean> consumer, ExecutorService asyncExecutor) {
+  private long timePartitionInterval;
+  private AtomicInteger splitIdGenerator = new AtomicInteger();
+
+  public MergedTsFileSplitter(
+      List<File> tsFiles,
+      Function<TsFileData, Boolean> consumer,
+      ExecutorService asyncExecutor,
+      long timePartitionInterval) {
     this.tsFiles = tsFiles;
     this.consumer = consumer;
     this.asyncExecutor = asyncExecutor;
+    this.timePartitionInterval = timePartitionInterval;
     taskPriorityQueue = new PriorityQueue<>();
   }
 
-  public void splitTsFileByDataPartition()
-      throws IOException, IllegalStateException {
+  public void splitTsFileByDataPartition() throws IOException, 
IllegalStateException {
     for (File tsFile : tsFiles) {
       SplitTask splitTask = new SplitTask(tsFile, asyncExecutor);
       if (splitTask.hasNext()) {
@@ -115,6 +118,7 @@ public class MergedTsFileSplitter {
 
       for (SplitTask equalTask : equalTasks) {
         TsFileData tsFileData = equalTask.removeNext();
+        tsFileData.setSplitId(splitIdGenerator.incrementAndGet());
         consumer.apply(tsFileData);
         if (equalTask.hasNext()) {
           taskPriorityQueue.add(equalTask);
@@ -203,15 +207,17 @@ public class MergedTsFileSplitter {
 
       nextSplits = new LinkedBlockingDeque<>(64);
       if (asyncExecutor != null) {
-        asyncTask = asyncExecutor.submit(() -> {
-          try {
-            asyncComputeNext();
-          } catch (Throwable e) {
-            logger.info("Exception during splitting", e);
-            throw e;
-          }
-          return null;
-        });
+        asyncTask =
+            asyncExecutor.submit(
+                () -> {
+                  try {
+                    asyncComputeNext();
+                  } catch (Throwable e) {
+                    logger.info("Exception during splitting", e);
+                    throw e;
+                  }
+                  return null;
+                });
       }
     }
 
@@ -336,7 +342,8 @@ public class MergedTsFileSplitter {
                     == TsFileConstant.TIME_COLUMN_MASK);
             IChunkMetadata chunkMetadata = 
offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
             TTimePartitionSlot timePartitionSlot =
-                
TimePartitionUtils.getTimePartition(chunkMetadata.getStartTime());
+                TimePartitionUtils.getTimePartition(
+                    chunkMetadata.getStartTime(), timePartitionInterval);
             ChunkData chunkData =
                 ChunkData.createChunkData(isAligned, curDevice, header, 
timePartitionSlot);
 
@@ -379,7 +386,7 @@ public class MergedTsFileSplitter {
                         ? chunkMetadata.getStartTime()
                         : pageHeader.getStartTime();
                 TTimePartitionSlot pageTimePartitionSlot =
-                    TimePartitionUtils.getTimePartition(startTime);
+                    TimePartitionUtils.getTimePartition(startTime, 
timePartitionInterval);
                 if (!timePartitionSlot.equals(pageTimePartitionSlot)) {
                   if (!isAligned) {
                     insertNewChunk(chunkData);
@@ -406,9 +413,7 @@ public class MergedTsFileSplitter {
                 }
 
                 int start = 0;
-                long endTime =
-                    timePartitionSlot.getStartTime()
-                        + TimePartitionUtils.getTimePartitionInterval();
+                long endTime = timePartitionSlot.getStartTime() + 
timePartitionInterval;
                 for (int i = 0; i < times.length; i++) {
                   if (times[i] >= endTime) {
                     chunkData.writeDecodePage(times, values, start, i);
@@ -420,10 +425,9 @@ public class MergedTsFileSplitter {
                       insertNewChunk(chunkData);
                     }
 
-                    timePartitionSlot = 
TimePartitionUtils.getTimePartition(times[i]);
-                    endTime =
-                        timePartitionSlot.getStartTime()
-                            + TimePartitionUtils.getTimePartitionInterval();
+                    timePartitionSlot =
+                        TimePartitionUtils.getTimePartition(times[i], 
timePartitionInterval);
+                    endTime = timePartitionSlot.getStartTime() + 
timePartitionInterval;
                     chunkData =
                         ChunkData.createChunkData(isAligned, curDevice, 
header, timePartitionSlot);
                     start = i;
@@ -531,9 +535,7 @@ public class MergedTsFileSplitter {
       }
 
       @Override
-      public void writeToFileWriter(TsFileIOWriter writer) throws IOException {
-
-      }
+      public void writeToFileWriter(TsFileIOWriter writer) throws IOException 
{}
 
       @Override
       public boolean isModification() {
@@ -541,9 +543,15 @@ public class MergedTsFileSplitter {
       }
 
       @Override
-      public void serialize(DataOutputStream stream) throws IOException {
+      public void serialize(DataOutputStream stream) throws IOException {}
 
+      @Override
+      public int getSplitId() {
+        return 0;
       }
+
+      @Override
+      public void setSplitId(int sid) {}
     }
 
     private void getAllModification(Map<Long, List<Deletion>> 
offset2Deletions) throws IOException {
@@ -616,17 +624,24 @@ public class MergedTsFileSplitter {
     }
 
     private boolean needDecodeChunk(IChunkMetadata chunkMetadata) {
-      return !TimePartitionUtils.getTimePartition(chunkMetadata.getStartTime())
-          
.equals(TimePartitionUtils.getTimePartition(chunkMetadata.getEndTime()));
+      return !TimePartitionUtils.getTimePartition(
+              chunkMetadata.getStartTime(), timePartitionInterval)
+          .equals(
+              TimePartitionUtils.getTimePartition(
+                  chunkMetadata.getEndTime(), timePartitionInterval));
     }
 
     private boolean needDecodePage(PageHeader pageHeader, IChunkMetadata 
chunkMetadata) {
       if (pageHeader.getStatistics() == null) {
-        return 
!TimePartitionUtils.getTimePartition(chunkMetadata.getStartTime())
-            
.equals(TimePartitionUtils.getTimePartition(chunkMetadata.getEndTime()));
-      }
-      return !TimePartitionUtils.getTimePartition(pageHeader.getStartTime())
-          
.equals(TimePartitionUtils.getTimePartition(pageHeader.getEndTime()));
+        return !TimePartitionUtils.getTimePartition(
+                chunkMetadata.getStartTime(), timePartitionInterval)
+            .equals(
+                TimePartitionUtils.getTimePartition(
+                    chunkMetadata.getEndTime(), timePartitionInterval));
+      }
+      return !TimePartitionUtils.getTimePartition(pageHeader.getStartTime(), 
timePartitionInterval)
+          .equals(
+              TimePartitionUtils.getTimePartition(pageHeader.getEndTime(), 
timePartitionInterval));
     }
 
     private Pair<long[], Object[]> decodePage(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
index 6bfcdfadf44..ab7a3f2e406 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
@@ -57,6 +57,7 @@ public class NonAlignedChunkData implements ChunkData {
 
   private ChunkWriterImpl chunkWriter;
   private Chunk chunk;
+  private int splitId;
 
   public NonAlignedChunkData(
       String device, ChunkHeader chunkHeader, TTimePartitionSlot 
timePartitionSlot) {
@@ -121,6 +122,7 @@ public class NonAlignedChunkData implements ChunkData {
     ReadWriteIOUtils.write(isAligned(), stream);
     serializeAttr(stream);
     byteStream.writeTo(stream);
+    ReadWriteIOUtils.write(splitId, stream);
     close();
   }
 
@@ -309,6 +311,8 @@ public class NonAlignedChunkData implements ChunkData {
     chunkData.pageNumber = pageNumber;
     chunkData.deserializeTsFileData(stream);
     chunkData.close();
+
+    chunkData.setSplitId(ReadWriteIOUtils.readInt(stream));
     return chunkData;
   }
 
@@ -336,6 +340,18 @@ public class NonAlignedChunkData implements ChunkData {
         + chunkHeader
         + ", needDecodeChunk="
         + needDecodeChunk
+        + ", splitId="
+        + splitId
         + '}';
   }
+
+  @Override
+  public int getSplitId() {
+    return splitId;
+  }
+
+  @Override
+  public void setSplitId(int sid) {
+    this.splitId = sid;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
index a53a3f5fc30..79b96471623 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
@@ -37,6 +37,10 @@ public interface TsFileData {
 
   void serialize(DataOutputStream stream) throws IOException;
 
+  int getSplitId();
+
+  void setSplitId(int sid);
+
   static TsFileData deserialize(InputStream stream)
       throws IOException, PageException, IllegalPathException {
     boolean isModification = ReadWriteIOUtils.readBool(stream);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
new file mode 100644
index 00000000000..cc2c2c9556a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * TsFileDataManager batches splits generated by TsFileSplitter as 
LoadTsFilePieceNode, route the
+ * splits to associated replica set, and sends them to the replicas with the 
provided dispatching
+ * function.
+ */
+@SuppressWarnings("BooleanMethodIsAlwaysInverted")
+public class TsFileDataManager {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(TsFileDataManager.class);
+  private final DispatchFunction dispatchFunction;
+  private final DataPartitionBatchFetcher partitionBatchFetcher;
+  private final PlanNodeId planNodeId;
+  private final File targetFile;
+
+  private long dataSize;
+  private final Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece;
+  private final List<ChunkData> nonDirectionalChunkData;
+
+  @FunctionalInterface
+  public interface DispatchFunction {
+
+    boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode, 
TRegionReplicaSet replicaSet);
+  }
+
+  public TsFileDataManager(
+      DispatchFunction dispatchFunction,
+      PlanNodeId planNodeId,
+      File targetFile,
+      DataPartitionBatchFetcher partitionBatchFetcher) {
+    this.dispatchFunction = dispatchFunction;
+    this.planNodeId = planNodeId;
+    this.targetFile = targetFile;
+    this.dataSize = 0;
+    this.replicaSet2Piece = new HashMap<>();
+    this.nonDirectionalChunkData = new ArrayList<>();
+    this.partitionBatchFetcher = partitionBatchFetcher;
+  }
+
+  public boolean addOrSendTsFileData(TsFileData tsFileData) {
+    return tsFileData.isModification()
+        ? addOrSendDeletionData(tsFileData)
+        : addOrSendChunkData((ChunkData) tsFileData);
+  }
+
+  private boolean addOrSendChunkData(ChunkData chunkData) {
+    nonDirectionalChunkData.add(chunkData);
+    dataSize += chunkData.getDataSize();
+
+    if (dataSize > LoadTsFileScheduler.MAX_MEMORY_SIZE) {
+      routeChunkData();
+
+      // start to dispatch from the biggest TsFilePieceNode
+      List<TRegionReplicaSet> sortedReplicaSets =
+          replicaSet2Piece.keySet().stream()
+              .sorted(
+                  Comparator.comparingLong(o -> 
replicaSet2Piece.get(o).getDataSize()).reversed())
+              .collect(Collectors.toList());
+
+      for (TRegionReplicaSet sortedReplicaSet : sortedReplicaSets) {
+        LoadTsFilePieceNode pieceNode = replicaSet2Piece.get(sortedReplicaSet);
+        if (pieceNode.getDataSize() == 0) { // total data size has been 
reduced to 0
+          break;
+        }
+        if (!dispatchFunction.dispatchOnePieceNode(pieceNode, 
sortedReplicaSet)) {
+          return false;
+        }
+
+        dataSize -= pieceNode.getDataSize();
+        replicaSet2Piece.put(
+            sortedReplicaSet,
+            new LoadTsFilePieceNode(
+                planNodeId, targetFile)); // can not just remove, because of 
deletion
+        if (dataSize <= LoadTsFileScheduler.MAX_MEMORY_SIZE) {
+          break;
+        }
+      }
+    }
+
+    return true;
+  }
+
+  private void routeChunkData() {
+    if (nonDirectionalChunkData.isEmpty()) {
+      return;
+    }
+
+    List<TRegionReplicaSet> replicaSets =
+        partitionBatchFetcher.queryDataPartition(
+            nonDirectionalChunkData.stream()
+                .map(data -> new Pair<>(data.getDevice(), 
data.getTimePartitionSlot()))
+                .collect(Collectors.toList()));
+    IntStream.range(0, nonDirectionalChunkData.size())
+        .forEach(
+            i ->
+                replicaSet2Piece
+                    .computeIfAbsent(
+                        replicaSets.get(i), o -> new 
LoadTsFilePieceNode(planNodeId, targetFile))
+                    .addTsFileData(nonDirectionalChunkData.get(i)));
+    nonDirectionalChunkData.clear();
+  }
+
+  private boolean addOrSendDeletionData(TsFileData deletionData) {
+    routeChunkData(); // ensure chunk data will be added before deletion
+
+    for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : 
replicaSet2Piece.entrySet()) {
+      dataSize += deletionData.getDataSize();
+      entry.getValue().addTsFileData(deletionData);
+    }
+    return true;
+  }
+
+  public boolean sendAllTsFileData() {
+    routeChunkData();
+
+    for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : 
replicaSet2Piece.entrySet()) {
+      if (!dispatchFunction.dispatchOnePieceNode(entry.getValue(), 
entry.getKey())) {
+        logger.warn("Dispatch piece node {} of TsFile {} error.", 
entry.getValue(), targetFile);
+        return false;
+      }
+    }
+    return true;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
new file mode 100644
index 00000000000..06dc76881d3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
+import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import 
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
+import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileDispatcherImpl.NODE_CONNECTION_ERROR;
+
+public class TsFileSplitSender {
+
+  private static final int MAX_RETRY = 5;
+  private static final long RETRY_INTERVAL_MS = 6_000L;
+  private static final Logger logger = 
LoggerFactory.getLogger(TsFileSplitSender.class);
+
+  private LoadTsFileNode loadTsFileNode;
+  private DataPartitionBatchFetcher targetPartitionFetcher;
+  private long targetPartitionInterval;
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      internalServiceClientManager;
+  // All consensus groups accessed in Phase1 should be notified in Phase2
+  private final Set<TRegionReplicaSet> allReplicaSets = new 
ConcurrentSkipListSet<>();
+  private String uuid;
+  private Map<TDataNodeLocation, Double> dataNodeThroughputMap = new 
ConcurrentHashMap<>();
+  private Random random = new Random();
+  private boolean isGeneratedByPipe;
+  private Map<Pair<LoadTsFilePieceNode, TRegionReplicaSet>, Exception> 
phaseOneFailures =
+      new ConcurrentHashMap<>();
+  private Map<TRegionReplicaSet, Exception> phaseTwoFailures = new HashMap<>();
+
+  public TsFileSplitSender(
+      LoadTsFileNode loadTsFileNode,
+      DataPartitionBatchFetcher targetPartitionFetcher,
+      long targetPartitionInterval,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager,
+      boolean isGeneratedByPipe) {
+    this.loadTsFileNode = loadTsFileNode;
+    this.targetPartitionFetcher = targetPartitionFetcher;
+    this.targetPartitionInterval = targetPartitionInterval;
+    this.internalServiceClientManager = internalServiceClientManager;
+    this.isGeneratedByPipe = isGeneratedByPipe;
+  }
+
+  public void start() throws IOException {
+    // skip files without data
+    loadTsFileNode.getResources().removeIf(f -> f.getDevices().isEmpty());
+    uuid = UUID.randomUUID().toString();
+
+    boolean isFirstPhaseSuccess = firstPhase(loadTsFileNode);
+    boolean isSecondPhaseSuccess = secondPhase(isFirstPhaseSuccess);
+    if (isFirstPhaseSuccess && isSecondPhaseSuccess) {
+      logger.info("Load TsFiles {} Successfully", 
loadTsFileNode.getResources());
+    } else {
+      logger.warn("Can not Load TsFiles {}", loadTsFileNode.getResources());
+    }
+  }
+
+  private boolean firstPhase(LoadTsFileNode node) throws IOException {
+    TsFileDataManager tsFileDataManager =
+        new TsFileDataManager(
+            this::dispatchOnePieceNode,
+            node.getPlanNodeId(),
+            node.lastResource().getTsFile(),
+            targetPartitionFetcher);
+    ExecutorService executorService =
+        IoTDBThreadPoolFactory.newThreadPool(
+            16,
+            Integer.MAX_VALUE,
+            20,
+            TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new IoTThreadFactory("MergedTsFileSplitter"),
+            "MergedTsFileSplitter");
+    new MergedTsFileSplitter(
+            node.getResources().stream()
+                .map(TsFileResource::getTsFile)
+                .collect(Collectors.toList()),
+            tsFileDataManager::addOrSendTsFileData,
+            executorService,
+            targetPartitionInterval)
+        .splitTsFileByDataPartition();
+    return tsFileDataManager.sendAllTsFileData() && phaseOneFailures.isEmpty();
+  }
+
+  private boolean secondPhase(boolean isFirstPhaseSuccess) {
+    logger.info("Start dispatching Load command for uuid {}", uuid);
+    TLoadCommandReq loadCommandReq =
+        new TLoadCommandReq(
+            (isFirstPhaseSuccess ? LoadCommand.EXECUTE : 
LoadCommand.ROLLBACK).ordinal(), uuid);
+    loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
+
+    for (TRegionReplicaSet replicaSet : allReplicaSets) {
+      loadCommandReq.setUseConsensus(true);
+      for (TDataNodeLocation dataNodeLocation : 
replicaSet.getDataNodeLocations()) {
+        TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
+        Exception groupException = null;
+        loadCommandReq.setConsensusGroupId(replicaSet.getRegionId());
+
+        for (int i = 0; i < MAX_RETRY; i++) {
+          try (SyncDataNodeInternalServiceClient client =
+              internalServiceClientManager.borrowClient(endPoint)) {
+            TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
+            if (!loadResp.isAccepted()) {
+              logger.warn(loadResp.message);
+              groupException = new 
FragmentInstanceDispatchException(loadResp.status);
+            }
+            break;
+          } catch (ClientManagerException | TException e) {
+            logger.warn(NODE_CONNECTION_ERROR, endPoint, e);
+            TSStatus status = new TSStatus();
+            status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+            status.setMessage(
+                "can't connect to node {}, please reset longer 
dn_connection_timeout_ms "
+                    + "in iotdb-common.properties and restart iotdb."
+                    + endPoint);
+            groupException = new FragmentInstanceDispatchException(status);
+          }
+          try {
+            Thread.sleep(RETRY_INTERVAL_MS);
+          } catch (InterruptedException e) {
+            groupException = e;
+            break;
+          }
+        }
+
+        if (groupException != null) {
+          phaseTwoFailures.put(replicaSet, groupException);
+        } else {
+          break;
+        }
+      }
+    }
+
+    return phaseTwoFailures.isEmpty();
+  }
+
+  /**
+   * The rank (probability of being chosen) is calculated as throughput / 
totalThroughput for those
+   * nodes that have not been used, their throughput is defined as 
Float.MAX_VALUE
+   *
+   * @param replicaSet replica set to be ranked
+   * @return the nodes and their ranks
+   */
+  private List<Pair<TDataNodeLocation, Double>> 
rankLocations(TRegionReplicaSet replicaSet) {
+    List<Pair<TDataNodeLocation, Double>> locations =
+        new ArrayList<>(replicaSet.dataNodeLocations.size());
+    // retrieve throughput of each node
+    double totalThroughput = 0.0;
+    for (TDataNodeLocation dataNodeLocation : 
replicaSet.getDataNodeLocations()) {
+      // use Float.MAX_VALUE so that they can be added together
+      double throughput =
+          dataNodeThroughputMap.computeIfAbsent(dataNodeLocation, l -> 
(double) Float.MAX_VALUE);
+      locations.add(new Pair<>(dataNodeLocation, throughput));
+      totalThroughput += throughput;
+    }
+    // calculate cumulative ranks
+    locations.get(0).right = locations.get(0).right / totalThroughput;
+    for (int i = 1; i < locations.size(); i++) {
+      Pair<TDataNodeLocation, Double> location = locations.get(i);
+      location.right = location.right / totalThroughput + locations.get(i - 
1).right;
+    }
+    return locations;
+  }
+
+  private Pair<TDataNodeLocation, Double> chooseNextLocation(
+      List<Pair<TDataNodeLocation, Double>> locations) {
+    int chosen = 0;
+    double dice = random.nextDouble();
+    for (int i = 1; i < locations.size() - 1; i++) {
+      if (locations.get(i - 1).right <= dice && dice < locations.get(i).right) 
{
+        chosen = i;
+      }
+    }
+    Pair<TDataNodeLocation, Double> chosenPair = locations.remove(chosen);
+    // update ranks
+    for (Pair<TDataNodeLocation, Double> location : locations) {
+      location.right = location.right / (1 - chosenPair.right);
+    }
+    return chosenPair;
+  }
+
+  @SuppressWarnings("BusyWait")
+  public boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode, 
TRegionReplicaSet replicaSet) {
+    allReplicaSets.add(replicaSet);
+
+    TTsFilePieceReq loadTsFileReq =
+        new TTsFilePieceReq(pieceNode.serializeToByteBuffer(), uuid, 
replicaSet.getRegionId());
+    loadTsFileReq.isRelay = true;
+    List<Pair<TDataNodeLocation, Double>> locations = 
rankLocations(replicaSet);
+
+    long startTime = 0;
+    boolean loadSucceed = false;
+    Exception lastConnectionError = null;
+    TDataNodeLocation currLocation = null;
+    while (!locations.isEmpty()) {
+      // the chosen location is removed from the list
+      Pair<TDataNodeLocation, Double> locationRankPair = 
chooseNextLocation(locations);
+      currLocation = locationRankPair.left;
+      startTime = System.currentTimeMillis();
+      for (int i = 0; i < MAX_RETRY; i++) {
+        try (SyncDataNodeInternalServiceClient client =
+            
internalServiceClientManager.borrowClient(currLocation.internalEndPoint)) {
+          TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
+          if (!loadResp.isAccepted()) {
+            logger.warn(loadResp.message);
+            phaseOneFailures.put(
+                new Pair<>(pieceNode, replicaSet),
+                new FragmentInstanceDispatchException(loadResp.status));
+            return false;
+          }
+          loadSucceed = true;
+          break;
+        } catch (ClientManagerException | TException e) {
+          lastConnectionError = e;
+        }
+
+        try {
+          Thread.sleep(RETRY_INTERVAL_MS);
+        } catch (InterruptedException e) {
+          return false;
+        }
+      }
+
+      if (loadSucceed) {
+        break;
+      }
+    }
+
+    if (!loadSucceed) {
+      String warning = NODE_CONNECTION_ERROR;
+      logger.warn(warning, locations, lastConnectionError);
+      TSStatus status = new TSStatus();
+      status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+      status.setMessage(warning + locations);
+      phaseOneFailures.put(
+          new Pair<>(pieceNode, replicaSet), new 
FragmentInstanceDispatchException(status));
+      return false;
+    }
+    long timeConsumption = System.currentTimeMillis() - startTime;
+    dataNodeThroughputMap.put(currLocation, pieceNode.getDataSize() * 1.0 / 
timeConsumption);
+    return true;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
index 2c915c10941..5542e04cf65 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
@@ -58,6 +58,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
 public class TsFileSplitter {
@@ -65,10 +66,16 @@ public class TsFileSplitter {
 
   private final File tsFile;
   private final Function<TsFileData, Boolean> consumer;
+  private final AtomicInteger splitIdGenerator = new AtomicInteger();
 
-  public TsFileSplitter(File tsFile, Function<TsFileData, Boolean> consumer) {
+  public TsFileSplitter(File tsFile, Function<TsFileData, Boolean> consumer, 
int startSplitId) {
     this.tsFile = tsFile;
     this.consumer = consumer;
+    splitIdGenerator.set(startSplitId);
+  }
+
+  public int getCurrentSplitId() {
+    return splitIdGenerator.get();
   }
 
   @SuppressWarnings({"squid:S3776", "squid:S6541"})
@@ -344,7 +351,12 @@ public class TsFileSplitter {
       offset2Deletions
           .pollFirstEntry()
           .getValue()
-          .forEach(o -> consumer.apply(new DeletionData(o)));
+          .forEach(
+              o -> {
+                DeletionData deletionData = new DeletionData(o);
+                deletionData.setSplitId(splitIdGenerator.incrementAndGet());
+                consumer.apply(deletionData);
+              });
     }
   }
 
@@ -359,6 +371,7 @@ public class TsFileSplitter {
       allChunkData.addAll(entry.getValue());
     }
     for (ChunkData chunkData : allChunkData) {
+      chunkData.setSplitId(splitIdGenerator.incrementAndGet());
       if (Boolean.FALSE.equals(consumer.apply(chunkData))) {
         throw new IllegalStateException(
             String.format(
@@ -370,6 +383,7 @@ public class TsFileSplitter {
   }
 
   private void consumeChunkData(String measurement, long offset, ChunkData 
chunkData) {
+    chunkData.setSplitId(splitIdGenerator.incrementAndGet());
     if (Boolean.FALSE.equals(consumer.apply(chunkData))) {
       throw new IllegalStateException(
           String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index 793e6139693..6e9cc5bee33 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -111,4 +111,12 @@ public class LoadTsFileNode extends WritePlanNode {
   public int hashCode() {
     return Objects.hash(resources);
   }
+
+  public List<TsFileResource> getResources() {
+    return resources;
+  }
+
+  public TsFileResource lastResource() {
+    return resources.get(resources.size() - 1);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 620920a01a8..f44c01e0b8f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -70,7 +70,7 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
   private final ExecutorService executor;
   private final boolean isGeneratedByPipe;
 
-  private static final String NODE_CONNECTION_ERROR = "can't connect to node 
{}";
+  public static final String NODE_CONNECTION_ERROR = "can't connect to node 
{}";
 
   public LoadTsFileDispatcherImpl(
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 834829c3849..cf93395cbf6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -22,12 +22,9 @@ package org.apache.iotdb.db.queryengine.plan.scheduler.load;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.partition.DataPartition;
-import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.partition.StorageExecutor;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -36,8 +33,8 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
 import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
 import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo;
-import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
+import 
org.apache.iotdb.db.queryengine.execution.load.DataPartitionBatchFetcher;
+import org.apache.iotdb.db.queryengine.execution.load.TsFileDataManager;
 import org.apache.iotdb.db.queryengine.execution.load.TsFileSplitter;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
@@ -49,7 +46,6 @@ import 
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult
 import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import io.airlift.units.Duration;
 import org.slf4j.Logger;
@@ -58,11 +54,8 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
@@ -70,8 +63,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /**
  * {@link LoadTsFileScheduler} is used for scheduling {@link 
LoadSingleTsFileNode} and {@link
@@ -81,10 +72,11 @@ import java.util.stream.IntStream;
  * 
href="https://apache-iotdb.feishu.cn/docx/doxcnyBYWzek8ksSEU6obZMpYLe";>...</a>;
  */
 public class LoadTsFileScheduler implements IScheduler {
+
   private static final Logger logger = 
LoggerFactory.getLogger(LoadTsFileScheduler.class);
   public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 5184000L; // one day
-  private static final long MAX_MEMORY_SIZE;
-  private static final int TRANSMIT_LIMIT;
+  public static final long MAX_MEMORY_SIZE;
+  public static final int TRANSMIT_LIMIT;
 
   static {
     IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -96,6 +88,10 @@ public class LoadTsFileScheduler implements IScheduler {
         
CommonDescriptor.getInstance().getConfig().getTTimePartitionSlotTransmitLimit();
   }
 
+  public static long getMaxMemorySize() {
+    return MAX_MEMORY_SIZE;
+  }
+
   private final MPPQueryContext queryContext;
   private final QueryStateMachine stateMachine;
   private final LoadTsFileDispatcherImpl dispatcher;
@@ -191,9 +187,14 @@ public class LoadTsFileScheduler implements IScheduler {
 
   private boolean firstPhase(LoadSingleTsFileNode node) {
     try {
-      TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node);
+      TsFileDataManager tsFileDataManager =
+          new TsFileDataManager(
+              this::dispatchOnePieceNode,
+              node.getPlanNodeId(),
+              node.getTsFileResource().getTsFile(),
+              partitionFetcher);
       new TsFileSplitter(
-              node.getTsFileResource().getTsFile(), 
tsFileDataManager::addOrSendTsFileData)
+              node.getTsFileResource().getTsFile(), 
tsFileDataManager::addOrSendTsFileData, 0)
           .splitTsFileByDataPartition();
       if (!tsFileDataManager.sendAllTsFileData()) {
         stateMachine.transitionToFailed(new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
@@ -216,8 +217,7 @@ public class LoadTsFileScheduler implements IScheduler {
     return true;
   }
 
-  private boolean dispatchOnePieceNode(
-      LoadTsFilePieceNode pieceNode, TRegionReplicaSet replicaSet) {
+  public boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode, 
TRegionReplicaSet replicaSet) {
     allReplicaSets.add(replicaSet);
     FragmentInstance instance =
         new FragmentInstance(
@@ -361,154 +361,7 @@ public class LoadTsFileScheduler implements IScheduler {
     ROLLBACK
   }
 
-  private static class TsFileDataManager {
-    private final LoadTsFileScheduler scheduler;
-    private final LoadSingleTsFileNode singleTsFileNode;
-
-    private long dataSize;
-    private final Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece;
-    private final List<ChunkData> nonDirectionalChunkData;
-
-    public TsFileDataManager(LoadTsFileScheduler scheduler, 
LoadSingleTsFileNode singleTsFileNode) {
-      this.scheduler = scheduler;
-      this.singleTsFileNode = singleTsFileNode;
-      this.dataSize = 0;
-      this.replicaSet2Piece = new HashMap<>();
-      this.nonDirectionalChunkData = new ArrayList<>();
-    }
-
-    private boolean addOrSendTsFileData(TsFileData tsFileData) {
-      return tsFileData.isModification()
-          ? addOrSendDeletionData(tsFileData)
-          : addOrSendChunkData((ChunkData) tsFileData);
-    }
-
-    private boolean addOrSendChunkData(ChunkData chunkData) {
-      nonDirectionalChunkData.add(chunkData);
-      dataSize += chunkData.getDataSize();
-
-      if (dataSize > MAX_MEMORY_SIZE) {
-        routeChunkData();
-
-        // start to dispatch from the biggest TsFilePieceNode
-        List<TRegionReplicaSet> sortedReplicaSets =
-            replicaSet2Piece.keySet().stream()
-                .sorted(
-                    Comparator.comparingLong(o -> 
replicaSet2Piece.get(o).getDataSize()).reversed())
-                .collect(Collectors.toList());
-
-        for (TRegionReplicaSet sortedReplicaSet : sortedReplicaSets) {
-          LoadTsFilePieceNode pieceNode = 
replicaSet2Piece.get(sortedReplicaSet);
-          if (pieceNode.getDataSize() == 0) { // total data size has been 
reduced to 0
-            break;
-          }
-          if (!scheduler.dispatchOnePieceNode(pieceNode, sortedReplicaSet)) {
-            return false;
-          }
-
-          dataSize -= pieceNode.getDataSize();
-          replicaSet2Piece.put(
-              sortedReplicaSet,
-              new LoadTsFilePieceNode(
-                  singleTsFileNode.getPlanNodeId(),
-                  singleTsFileNode
-                      .getTsFileResource()
-                      .getTsFile())); // can not just remove, because of 
deletion
-          if (dataSize <= MAX_MEMORY_SIZE) {
-            break;
-          }
-        }
-      }
-
-      return true;
-    }
-
-    private void routeChunkData() {
-      if (nonDirectionalChunkData.isEmpty()) {
-        return;
-      }
-
-      List<TRegionReplicaSet> replicaSets =
-          scheduler.partitionFetcher.queryDataPartition(
-              nonDirectionalChunkData.stream()
-                  .map(data -> new Pair<>(data.getDevice(), 
data.getTimePartitionSlot()))
-                  .collect(Collectors.toList()));
-      IntStream.range(0, nonDirectionalChunkData.size())
-          .forEach(
-              i ->
-                  replicaSet2Piece
-                      .computeIfAbsent(
-                          replicaSets.get(i),
-                          o ->
-                              new LoadTsFilePieceNode(
-                                  singleTsFileNode.getPlanNodeId(),
-                                  
singleTsFileNode.getTsFileResource().getTsFile()))
-                      .addTsFileData(nonDirectionalChunkData.get(i)));
-      nonDirectionalChunkData.clear();
-    }
-
-    private boolean addOrSendDeletionData(TsFileData deletionData) {
-      routeChunkData(); // ensure chunk data will be added before deletion
-
-      for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : 
replicaSet2Piece.entrySet()) {
-        dataSize += deletionData.getDataSize();
-        entry.getValue().addTsFileData(deletionData);
-      }
-      return true;
-    }
-
-    private boolean sendAllTsFileData() {
-      routeChunkData();
-
-      for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : 
replicaSet2Piece.entrySet()) {
-        if (!scheduler.dispatchOnePieceNode(entry.getValue(), entry.getKey())) 
{
-          logger.warn(
-              "Dispatch piece node {} of TsFile {} error.",
-              entry.getValue(),
-              singleTsFileNode.getTsFileResource().getTsFile());
-          return false;
-        }
-      }
-      return true;
-    }
-  }
-
-  private static class DataPartitionBatchFetcher {
-    private final IPartitionFetcher fetcher;
-
-    public DataPartitionBatchFetcher(IPartitionFetcher fetcher) {
-      this.fetcher = fetcher;
-    }
-
-    public List<TRegionReplicaSet> queryDataPartition(
-        List<Pair<String, TTimePartitionSlot>> slotList) {
-      List<TRegionReplicaSet> replicaSets = new ArrayList<>();
-      int size = slotList.size();
-
-      for (int i = 0; i < size; i += TRANSMIT_LIMIT) {
-        List<Pair<String, TTimePartitionSlot>> subSlotList =
-            slotList.subList(i, Math.min(size, i + TRANSMIT_LIMIT));
-        DataPartition dataPartition = 
fetcher.getOrCreateDataPartition(toQueryParam(subSlotList));
-        replicaSets.addAll(
-            subSlotList.stream()
-                .map(pair -> 
dataPartition.getDataRegionReplicaSetForWriting(pair.left, pair.right))
-                .collect(Collectors.toList()));
-      }
-      return replicaSets;
-    }
-
-    private List<DataPartitionQueryParam> toQueryParam(
-        List<Pair<String, TTimePartitionSlot>> slots) {
-      return slots.stream()
-          .collect(
-              Collectors.groupingBy(
-                  Pair::getLeft, Collectors.mapping(Pair::getRight, 
Collectors.toSet())))
-          .entrySet()
-          .stream()
-          .map(
-              entry ->
-                  new DataPartitionQueryParam(entry.getKey(), new 
ArrayList<>(entry.getValue())))
-          .collect(Collectors.toList());
-    }
+  public DataPartitionBatchFetcher getPartitionFetcher() {
+    return partitionFetcher;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
index 3b5e0f8177e..e72a2e2a17d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
@@ -32,6 +32,12 @@ public class TimePartitionUtils {
     return timePartitionSlot;
   }
 
+  public static TTimePartitionSlot getTimePartition(long time, long 
timePartitionInterval) {
+    TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
+    timePartitionSlot.setStartTime(time - time % timePartitionInterval);
+    return timePartitionSlot;
+  }
+
   public static long getTimePartitionInterval() {
     return timePartitionInterval;
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
index 6461ce04585..1d31c90ccc9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
@@ -19,14 +19,19 @@
 
 package org.apache.iotdb.db.queryengine.execution.load;
 
+import static org.junit.Assert.assertEquals;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
+import org.apache.iotdb.db.utils.TimePartitionUtils;
+
+import org.junit.Test;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
-import org.junit.Test;
 
 public class MergedTsFileSplitterTest extends TestBase {
 
@@ -35,20 +40,30 @@ public class MergedTsFileSplitterTest extends TestBase {
   @Test
   public void testSplit() throws IOException {
     long start = System.currentTimeMillis();
-    MergedTsFileSplitter splitter = new MergedTsFileSplitter(files, 
this::consumeSplit,
-        IoTDBThreadPoolFactory.newThreadPool(16, Integer.MAX_VALUE, 20, 
TimeUnit.SECONDS,
-            new SynchronousQueue<>(),
-            new IoTThreadFactory("MergedTsFileSplitter"), 
"MergedTsFileSplitter"));
+    MergedTsFileSplitter splitter =
+        new MergedTsFileSplitter(
+            files,
+            this::consumeSplit,
+            IoTDBThreadPoolFactory.newThreadPool(
+                16,
+                Integer.MAX_VALUE,
+                20,
+                TimeUnit.SECONDS,
+                new SynchronousQueue<>(),
+                new IoTThreadFactory("MergedTsFileSplitter"),
+                "MergedTsFileSplitter"),
+            TimePartitionUtils.getTimePartitionInterval());
     try {
       splitter.splitTsFileByDataPartition();
       for (TsFileData tsFileData : resultSet) {
-//         System.out.println(tsFileData);
+        //         System.out.println(tsFileData);
       }
     } finally {
       splitter.close();
     }
-    System.out.printf("%d splits after %dms\n", resultSet.size(),
-        System.currentTimeMillis() - start);
+    System.out.printf(
+        "%d splits after %dms\n", resultSet.size(), System.currentTimeMillis() 
- start);
+    assertEquals(resultSet.size(), expectedChunkNum());
   }
 
   public boolean consumeSplit(TsFileData data) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
index cbbab9bd7c3..d10acdebffc 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
@@ -19,12 +19,34 @@
 
 package org.apache.iotdb.db.queryengine.execution.load;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.IntStream;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.client.ClientManager;
+import 
org.apache.iotdb.commons.client.ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
+import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -33,11 +55,35 @@ import org.apache.iotdb.tsfile.write.TsFileWriter;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TField;
+import org.apache.thrift.protocol.TList;
+import org.apache.thrift.protocol.TMap;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolDecorator;
+import org.apache.thrift.protocol.TSet;
+import org.apache.thrift.protocol.TStruct;
+import org.apache.thrift.transport.TByteBuffer;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
 import org.junit.After;
 import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
 public class TestBase {
 
   private static final Logger logger = LoggerFactory.getLogger(TestBase.class);
@@ -47,19 +93,29 @@ public class TestBase {
   public static final String TEST_TSFILE_PATH =
       BASE_OUTPUT_PATH + "testTsFile".concat(File.separator) + 
PARTIAL_PATH_STRING;
 
-  protected int fileNum = 100;
+  protected int fileNum = new Random().nextInt(100) + 1;
   // series number of each file, sn non-aligned series and 1 aligned series 
with sn measurements
-  protected int seriesNum = 100;
+  protected int seriesNum = 1;
   // number of chunks of each series in a file, each series has only one chunk 
in a file
   protected double chunkTimeRangeRatio = 0.3;
   // the interval between two consecutive points of a series
   protected long pointInterval = 50_000;
-  protected List<File> files = new ArrayList<>();
+  protected final List<File> files = new ArrayList<>();
+  protected final List<TsFileResource> tsFileResources = new ArrayList<>();
+  protected IPartitionFetcher partitionFetcher;
+  // the key is deviceId, not partitioned by time in the simple test
+  protected Map<String, TRegionReplicaSet> partitionTable = new HashMap<>();
+  protected Map<ConsensusGroupId, TRegionReplicaSet> groupId2ReplicaSetMap = 
new HashMap<>();
+  protected IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      internalServiceClientManager;
 
   @Before
   public void setup() throws IOException, WriteProcessException {
     setupFiles();
-    logger.info("Files set up");
+    logger.info("{} files set up", files.size());
+    partitionFetcher = dummyPartitionFetcher();
+    setupPartitionTable();
+    setupClientManager();
   }
 
   @After
@@ -69,48 +125,221 @@ public class TestBase {
     }
   }
 
-  public void setupFiles() {
+  public int expectedChunkNum() {
+    double totalTimeRange = chunkTimeRangeRatio * fileNum;
+    int splitChunkNum = 0;
+    // if the boundary of the ith partition does not overlap a chunk, it 
introduces an additional split
+    // TODO: due to machine precision, the calculation may have error
+    for (int i = 0; i <= totalTimeRange; i++) {
+      if (i * 1.0 % chunkTimeRangeRatio > 0.00001) {
+        splitChunkNum += 1;
+      }
+    }
+    return (splitChunkNum + fileNum) * seriesNum * 2;
+  }
 
-    IntStream.range(0, fileNum).parallel().forEach(i -> {
-      try {
-        File file = new File(getTestTsFilePath("root.sg1", 0, 0, i));
-        synchronized (files) {
-          files.add(file);
-        }
-
-        try (TsFileWriter writer = new TsFileWriter(file)) {
-          // 3 non-aligned series under d1 and 1 aligned series with 3 
measurements under d2
-          for (int sn = 0; sn < seriesNum; sn++) {
-            writer.registerTimeseries(
-                new Path("d1"), new MeasurementSchema("s" + sn, 
TSDataType.DOUBLE));
-          }
-          List<MeasurementSchema> alignedSchemas = new ArrayList<>();
-          for (int sn = 0; sn < seriesNum; sn++) {
-            alignedSchemas.add(new MeasurementSchema("s" + sn, 
TSDataType.DOUBLE));
-          }
-          writer.registerAlignedTimeseries(new Path("d2"), alignedSchemas);
+  public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint 
tEndpoint) {
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+    LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) 
PlanNodeType.deserialize(req.body);
+    return new TLoadResp();
+  }
+
+  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint 
tEndpoint) {
+    return new TLoadResp();
+  }
+
+  public TProtocol dummyProtocol() throws TTransportException {
+    return new TBinaryProtocol(new TByteBuffer(ByteBuffer.allocate(0)));
+  }
+
+  public void setupClientManager() {
+    SyncDataNodeInternalServiceClientPoolFactory poolFactory =
+        new SyncDataNodeInternalServiceClientPoolFactory();
+    internalServiceClientManager =
+        new ClientManager<TEndPoint, 
SyncDataNodeInternalServiceClient>(poolFactory) {
+          @Override
+          public SyncDataNodeInternalServiceClient borrowClient(TEndPoint 
node) {
+            try {
+              return new SyncDataNodeInternalServiceClient(
+                  dummyProtocol(),
+                  new ThriftClientProperty.Builder().build(), node, this) {
+                @Override
+                public TLoadResp sendTsFilePieceNode(TTsFilePieceReq req) {
+                  return handleTsFilePieceNode(req, getTEndpoint());
+                }
 
-          long timePartitionInterval = 
TimePartitionUtils.getTimePartitionInterval();
-          long chunkTimeRange = (long) (timePartitionInterval * 
chunkTimeRangeRatio);
-          int chunkPointNum = (int) (chunkTimeRange / pointInterval);
+                @Override
+                public TLoadResp sendLoadCommand(TLoadCommandReq req) {
+                  return handleTsLoadCommand(req, getTEndpoint());
+                }
 
-          for (int pn = 0; pn < chunkPointNum; pn++) {
-            long currTime = chunkTimeRange * i + pointInterval * pn;
-            TSRecord record = new TSRecord(currTime, "d1");
-            for (int sn = 0; sn < seriesNum; sn++) {
-              record.addTuple(new DoubleDataPoint("s" + sn, pn * 1.0));
+                @Override
+                public void close() {
+                }
+              };
+            } catch (TTransportException e) {
+              throw new RuntimeException(e);
             }
-            writer.write(record);
+          }
 
-            record.deviceId = "d2";
-            writer.writeAligned(record);
+          @Override
+          public void clear(TEndPoint node) {
           }
-          writer.flushAllChunkGroups();
-        }
-      } catch (IOException | WriteProcessException e) {
-        throw new RuntimeException(e);
+
+          @Override
+          public void close() {
+          }
+        };
+  }
+
+  public void setupPartitionTable() {
+    ConsensusGroupId d1GroupId = 
Factory.create(TConsensusGroupType.DataRegion.getValue(),
+        0);
+    TRegionReplicaSet d1Replicas =
+        new TRegionReplicaSet(
+            d1GroupId.convertToTConsensusGroupId(),
+            Arrays.asList(
+                new TDataNodeLocation()
+                    .setDataNodeId(0)
+                    .setInternalEndPoint(new TEndPoint("localhost", 10000)),
+                new TDataNodeLocation()
+                    .setDataNodeId(1)
+                    .setInternalEndPoint(new TEndPoint("localhost", 10001)),
+                new TDataNodeLocation()
+                    .setDataNodeId(2)
+                    .setInternalEndPoint(new TEndPoint("localhost", 10002))));
+    partitionTable.put("d1", d1Replicas);
+    groupId2ReplicaSetMap.put(d1GroupId, d1Replicas);
+
+    ConsensusGroupId d2GroupId = 
Factory.create(TConsensusGroupType.DataRegion.getValue(), 1);
+    TRegionReplicaSet d2Replicas =
+        new TRegionReplicaSet(
+            d2GroupId.convertToTConsensusGroupId(),
+            Arrays.asList(
+                new TDataNodeLocation()
+                    .setDataNodeId(3)
+                    .setInternalEndPoint(new TEndPoint("localhost", 10003)),
+                new TDataNodeLocation()
+                    .setDataNodeId(4)
+                    .setInternalEndPoint(new TEndPoint("localhost", 10004)),
+                new TDataNodeLocation()
+                    .setDataNodeId(5)
+                    .setInternalEndPoint(new TEndPoint("localhost", 10005))));
+    partitionTable.put("d2", d2Replicas);
+    groupId2ReplicaSetMap.put(d2GroupId, d2Replicas);
+  }
+
+  public IPartitionFetcher dummyPartitionFetcher() {
+    return new IPartitionFetcher() {
+      @Override
+      public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
+        return null;
+      }
+
+      @Override
+      public SchemaPartition getOrCreateSchemaPartition(PathPatternTree 
patternTree) {
+        return null;
+      }
+
+      @Override
+      public DataPartition getDataPartition(
+          Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+        return null;
+      }
+
+      @Override
+      public DataPartition getDataPartitionWithUnclosedTimeRange(
+          Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+        return null;
+      }
+
+      @Override
+      public DataPartition getOrCreateDataPartition(
+          Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+        return null;
+      }
+
+      @Override
+      public DataPartition getOrCreateDataPartition(
+          List<DataPartitionQueryParam> dataPartitionQueryParams) {
+        return null;
+      }
+
+      @Override
+      public SchemaNodeManagementPartition 
getSchemaNodeManagementPartitionWithLevel(
+          PathPatternTree patternTree, Integer level) {
+        return null;
+      }
+
+      @Override
+      public boolean updateRegionCache(TRegionRouteReq req) {
+        return false;
       }
-    });
+
+      @Override
+      public void invalidAllCache() {
+      }
+    };
+  }
+
+  public void setupFiles() {
+
+    IntStream.range(0, fileNum)
+        .parallel()
+        .forEach(
+            i -> {
+              try {
+                File file = new File(getTestTsFilePath("root.sg1", 0, 0, i));
+                TsFileResource tsFileResource = new TsFileResource(file);
+                synchronized (files) {
+                  files.add(file);
+                }
+
+                try (TsFileWriter writer = new TsFileWriter(file)) {
+                  // sn non-aligned series under d1 and 1 aligned series with 
sn measurements under
+                  // d2
+                  for (int sn = 0; sn < seriesNum; sn++) {
+                    writer.registerTimeseries(
+                        new Path("d1"), new MeasurementSchema("s" + sn, 
TSDataType.DOUBLE));
+                  }
+                  List<MeasurementSchema> alignedSchemas = new ArrayList<>();
+                  for (int sn = 0; sn < seriesNum; sn++) {
+                    alignedSchemas.add(new MeasurementSchema("s" + sn, 
TSDataType.DOUBLE));
+                  }
+                  writer.registerAlignedTimeseries(new Path("d2"), 
alignedSchemas);
+
+                  // one chunk for each series
+                  long timePartitionInterval = 
TimePartitionUtils.getTimePartitionInterval();
+                  long chunkTimeRange = (long) (timePartitionInterval * 
chunkTimeRangeRatio);
+                  int chunkPointNum = (int) (chunkTimeRange / pointInterval);
+
+                  for (int pn = 0; pn < chunkPointNum; pn++) {
+                    long currTime = chunkTimeRange * i + pointInterval * pn;
+                    TSRecord record = new TSRecord(currTime, "d1");
+                    for (int sn = 0; sn < seriesNum; sn++) {
+                      record.addTuple(new DoubleDataPoint("s" + sn, pn * 1.0));
+                    }
+                    writer.write(record);
+
+                    record.deviceId = "d2";
+                    writer.writeAligned(record);
+                  }
+                  writer.flushAllChunkGroups();
+                  tsFileResource.updateStartTime("d1", chunkTimeRange * i);
+                  tsFileResource.updateStartTime("d2", chunkTimeRange * i);
+                  tsFileResource.updateEndTime("d1", chunkTimeRange * (i + 1));
+                  tsFileResource.updateEndTime("d2", chunkTimeRange * (i + 1));
+                }
+
+                tsFileResource.close();
+                synchronized (tsFileResources) {
+                  tsFileResources.add(tsFileResource);
+                }
+              } catch (IOException | WriteProcessException e) {
+                throw new RuntimeException(e);
+              }
+            });
   }
 
   public static String getTestTsFilePath(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
new file mode 100644
index 00000000000..dd8b20963ee
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import 
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
+import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
+import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.junit.Test;
+
+public class TsFileSplitSenderTest extends TestBase {
+
+  // the third key is UUid
+  protected Map<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File, 
List<TsFileData>>>>>
+      phaseOneResults = new ConcurrentSkipListMap<>();
+  protected Map<TEndPoint, Map<ConsensusGroupId, Map<String, Integer>>> 
phaseTwoResults =
+      new ConcurrentSkipListMap<>();
+
+  public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint 
tEndpoint) {
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+    LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) 
PlanNodeType.deserialize(
+        req.body.slice());
+    List<TsFileData> tsFileData =
+        phaseOneResults
+            .computeIfAbsent(tEndpoint, e -> new ConcurrentSkipListMap<>(
+                Comparator.comparingInt(ConsensusGroupId::getId)))
+            .computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
+            .computeIfAbsent(req.uuid, id -> new ConcurrentSkipListMap<>())
+            .computeIfAbsent(pieceNode.getTsFile(), f -> new ArrayList<>());
+    synchronized (tsFileData) {
+      tsFileData.addAll(pieceNode.getAllTsFileData());
+    }
+
+    // forward to other replicas in the group
+    if (req.isRelay) {
+      req.isRelay = false;
+      TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
+      for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
+        TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
+        if (!otherPoint.equals(tEndpoint)) {
+          handleTsFilePieceNode(req, otherPoint);
+        }
+      }
+    }
+
+    return new TLoadResp().setAccepted(true)
+        .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+  }
+
+  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint 
tEndpoint) {
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+    phaseTwoResults
+        .computeIfAbsent(tEndpoint,
+            e -> new 
ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
+        .computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
+        .computeIfAbsent(req.uuid, id -> req.commandType);
+
+    // forward to other replicas in the group
+    if (req.useConsensus) {
+      req.useConsensus = false;
+      TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
+      for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
+        TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
+        if (!otherPoint.equals(tEndpoint)) {
+          handleTsLoadCommand(req, otherPoint);
+        }
+      }
+    }
+
+    return new TLoadResp().setAccepted(true)
+        .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+  }
+
+  @Test
+  public void test() throws IOException {
+    LoadTsFileNode loadTsFileNode =
+        new LoadTsFileNode(new PlanNodeId("testPlanNode"), tsFileResources);
+    DataPartitionBatchFetcher partitionBatchFetcher =
+        new DataPartitionBatchFetcher(partitionFetcher) {
+          @Override
+          public List<TRegionReplicaSet> queryDataPartition(
+              List<Pair<String, TTimePartitionSlot>> slotList) {
+            return slotList.stream()
+                .map(p -> partitionTable.get(p.left))
+                .collect(Collectors.toList());
+          }
+        };
+    TsFileSplitSender splitSender =
+        new TsFileSplitSender(
+            loadTsFileNode,
+            partitionBatchFetcher,
+            TimePartitionUtils.getTimePartitionInterval(),
+            internalServiceClientManager,
+            false);
+    long start = System.currentTimeMillis();
+    splitSender.start();
+    long timeConsumption = System.currentTimeMillis() - start;
+
+    printResult();
+    System.out.printf("Split ends after %dms", timeConsumption);
+  }
+
+  public void printResult() {
+    System.out.print("Phase one:\n");
+    for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File, 
List<TsFileData>>>>>
+        tEndPointMapEntry : phaseOneResults.entrySet()) {
+      TEndPoint endPoint = tEndPointMapEntry.getKey();
+      for (Entry<ConsensusGroupId, Map<String, Map<File, List<TsFileData>>>>
+          consensusGroupIdMapEntry : tEndPointMapEntry.getValue().entrySet()) {
+        ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
+        for (Entry<String, Map<File, List<TsFileData>>> stringMapEntry :
+            consensusGroupIdMapEntry.getValue().entrySet()) {
+          String uuid = stringMapEntry.getKey();
+          for (Entry<File, List<TsFileData>> fileListEntry : 
stringMapEntry.getValue().entrySet()) {
+            File tsFile = fileListEntry.getKey();
+            List<TsFileData> chunks = fileListEntry.getValue();
+            System.out.printf(
+                "%s - %s - %s - %s - %s chunks\n", endPoint, consensusGroupId, 
uuid, tsFile, chunks.size());
+            assertEquals(chunks.size(), expectedChunkNum() / 2);
+          }
+        }
+      }
+    }
+
+    System.out.print("Phase two:\n");
+    for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Integer>>> 
tEndPointMapEntry :
+        phaseTwoResults.entrySet()) {
+      TEndPoint endPoint = tEndPointMapEntry.getKey();
+      for (Entry<ConsensusGroupId, Map<String, Integer>> 
consensusGroupIdMapEntry :
+          tEndPointMapEntry.getValue().entrySet()) {
+        ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
+        for (Entry<String, Integer> stringMapEntry :
+            consensusGroupIdMapEntry.getValue().entrySet()) {
+          String uuid = stringMapEntry.getKey();
+          int command = stringMapEntry.getValue();
+          System.out.printf("%s - %s - %s - %s\n", endPoint, consensusGroupId, 
uuid,
+              LoadCommand.values()[command]);
+          assertEquals(command, LoadCommand.EXECUTE.ordinal());
+        }
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
index 78b61c8d664..62b5995b61b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
@@ -19,26 +19,33 @@
 
 package org.apache.iotdb.db.queryengine.execution.load;
 
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import org.junit.Test;
 
-public class TsFileSplitterTest extends TestBase{
+public class TsFileSplitterTest extends TestBase {
   private List<TsFileData> resultSet = new ArrayList<>();
 
   @Test
   public void testSplit() throws IOException {
     long start = System.currentTimeMillis();
+    int splitId = 0;
     for (File file : files) {
-      TsFileSplitter splitter = new TsFileSplitter(file, this::consumeSplit);
+      TsFileSplitter splitter = new TsFileSplitter(file, this::consumeSplit, 
splitId + 1);
       splitter.splitTsFileByDataPartition();
+      splitId = splitter.getCurrentSplitId();
     }
     for (TsFileData tsFileData : resultSet) {
       // System.out.println(tsFileData);
     }
-    System.out.printf("%d splits after %dms\n", resultSet.size(), 
System.currentTimeMillis() - start);
+    System.out.printf(
+        "%d/%d splits after %dms\n", resultSet.size(), expectedChunkNum(), 
System.currentTimeMillis() - start);
+    assertEquals(resultSet.size(), expectedChunkNum());
   }
 
   public boolean consumeSplit(TsFileData data) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 3ee338d4ec7..f99b93674cc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -35,7 +35,7 @@ public class ClientManager<K, V> implements IClientManager<K, 
V> {
 
   private final KeyedObjectPool<K, V> pool;
 
-  ClientManager(IClientPoolFactory<K, V> factory) {
+  protected ClientManager(IClientPoolFactory<K, V> factory) {
     pool = factory.createClientPool(this);
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index 2b90b5b2661..91e2f908b9d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
@@ -65,6 +66,20 @@ public class SyncDataNodeInternalServiceClient extends 
IDataNodeRPCService.Clien
     getInputProtocol().getTransport().open();
   }
 
+  @TestOnly
+  public SyncDataNodeInternalServiceClient(
+      TProtocol protocol,
+      ThriftClientProperty property,
+      TEndPoint endpoint,
+      ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
clientManager)
+      throws TTransportException {
+    super(protocol);
+    this.printLogWhenEncounterException = 
property.isPrintLogWhenEncounterException();
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+    getInputProtocol().getTransport().open();
+  }
+
   public int getTimeout() throws SocketException {
     return ((TimeoutChangeableTransport) 
getInputProtocol().getTransport()).getTimeOut();
   }
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 1268c399c5f..0cd4313e43f 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -308,12 +308,16 @@ struct TTsFilePieceReq{
     1: required binary body
     2: required string uuid
     3: required common.TConsensusGroupId consensusGroupId
+    // if isRelay is true, the receiver should forward the request to other 
replicas in the group
+    4: optional bool isRelay
 }
 
 struct TLoadCommandReq{
     1: required i32 commandType
     2: required string uuid
     3: optional bool isGeneratedByPipe
+    4: optional bool useConsensus
+    5: optional common.TConsensusGroupId consensusGroupId
 }
 
 struct TLoadResp{


Reply via email to