This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/load_v2 by this push:
     new ec0d8aa02e4 add throughput monitor fix schema registration add configs 
and timers
     new 270f6f6efce Merge branch 'load_v2' of github.com:apache/iotdb into 
load_v2
ec0d8aa02e4 is described below

commit ec0d8aa02e480dc8ff14072d75348c30a915a2ed
Author: Tian Jiang <[email protected]>
AuthorDate: Tue Nov 7 14:09:23 2023 +0800

    add throughput monitor
    fix schema registration
    add configs and timers
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |   9 +
 .../confignode/conf/ConfigNodeDescriptor.java      |   8 +
 .../manager/partition/PartitionManager.java        |  62 +++++--
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  12 +-
 .../config/constant/PipeConnectorConstant.java     |   2 +
 .../config/constant/PipeExtractorConstant.java     |   9 +
 .../thrift/async/IoTDBThriftAsyncConnector.java    |  33 +++-
 .../tsfile/PipeBatchTsFileInsertionEvent.java      |  40 +++-
 .../pipe/extractor/IoTDBDataRegionExtractor.java   |  19 +-
 .../historical/BatchedTsFileExtractor.java         |  48 ++++-
 .../extractor/historical/ThroughputMonitor.java    | 125 +++++++++++++
 .../subtask/connector/PipeConnectorSubtask.java    |   4 +
 .../iotdb/db/protocol/client/ConfigNodeClient.java |   6 +
 .../db/protocol/session/ExternalClientSession.java |  92 ++++++++++
 .../iotdb/db/protocol/session/SessionManager.java  |   8 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       | 163 +++++++++++++++--
 .../iotdb/db/queryengine/common/QueryId.java       |   6 +-
 .../execution/load/LoadTsFileManager.java          |  53 ++++--
 .../execution/load/TsFileSplitSender.java          | 203 +++++++++++----------
 .../analyze/cache/partition/PartitionCache.java    |  23 ++-
 .../analyze/partition/BasicPartitionFetcher.java   |   4 +-
 .../analyze/partition/ClusterPartitionFetcher.java |   3 +-
 .../partition/ExternalPartitionFetcher.java        |   5 +-
 .../plan/analyze/schema/SchemaValidator.java       |  23 +++
 .../planner/plan/node/load/LoadTsFileNode.java     |   8 +
 .../execution/load/LoadTsFileManagerTest.java      |   7 +-
 .../execution/load/TsFileSplitSenderTest.java      |  17 +-
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java |  12 ++
 .../org/apache/iotdb/tsfile/utils/TsFileUtils.java |   9 +-
 .../src/main/thrift/datanode.thrift                |   6 +-
 30 files changed, 834 insertions(+), 185 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 277a3513f3c..254d089696f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -301,6 +301,7 @@ public class ConfigNodeConfig {
   private boolean isEnablePrintingNewlyCreatedPartition = false;
 
   private long forceWalPeriodForConfigNodeSimpleInMs = 100;
+  private boolean isEnableAutoCreateDatabase = true;
 
   public ConfigNodeConfig() {
     // empty constructor
@@ -1232,4 +1233,12 @@ public class ConfigNodeConfig {
       long dataRegionRatisPeriodicSnapshotInterval) {
     this.dataRegionRatisPeriodicSnapshotInterval = 
dataRegionRatisPeriodicSnapshotInterval;
   }
+
+  public boolean isEnableAutoCreateDatabase() {
+    return isEnableAutoCreateDatabase;
+  }
+
+  public void setEnableAutoCreateDatabase(boolean enableAutoCreateDatabase) {
+    isEnableAutoCreateDatabase = enableAutoCreateDatabase;
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 99cc0e6376d..814f8fcd084 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -429,6 +429,14 @@ public class ConfigNodeDescriptor {
                     String.valueOf(conf.getProcedureCoreWorkerThreadsCount()))
                 .trim()));
 
+    conf.setEnableAutoCreateDatabase(
+        Boolean.parseBoolean(
+            properties
+                .getProperty(
+                    "procedure_core_worker_thread_count",
+                    String.valueOf(conf.isEnableAutoCreateDatabase()))
+                .trim()));
+
     loadRatisConsensusConfig(properties);
     loadCQConfig(properties);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index f49fa60f00e..3d57339de64 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -43,6 +43,7 @@ import 
org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
 import 
org.apache.iotdb.confignode.consensus.request.read.partition.CountTimeSlotListPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.partition.GetNodePathsPartitionPlan;
@@ -53,6 +54,7 @@ import 
org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlo
 import 
org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
@@ -81,10 +83,12 @@ import 
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDelete
 import 
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask;
 import 
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainType;
 import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
 import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import 
org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
@@ -212,14 +216,27 @@ public class PartitionManager {
     // Check if the related Databases exist
     for (String database : req.getPartitionSlotsMap().keySet()) {
       if (!isDatabaseExist(database)) {
-        return new SchemaPartitionResp(
-            new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode())
-                .setMessage(
-                    String.format(
-                        "Create SchemaPartition failed because the database: 
%s is not exists",
-                        database)),
-            false,
-            null);
+        if (!CONF.isEnableAutoCreateDatabase()) {
+          return new SchemaPartitionResp(
+              new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode())
+                  .setMessage(
+                      String.format(
+                          "Create SchemaPartition failed because the database: 
%s is not exists",
+                          database)),
+              false,
+              null);
+        } else {
+          TDatabaseSchema databaseSchema = new TDatabaseSchema(database);
+          ConfigNodeRPCServiceProcessor.validateDatabaseSchema(databaseSchema);
+          DatabaseSchemaPlan databaseSchemaPlan =
+              new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase, 
databaseSchema);
+          TSStatus tsStatus = 
consensusWritePartitionResult(databaseSchemaPlan);
+          LOGGER.info("Auto create database {}: {}", database, tsStatus);
+          if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+              && !isDatabaseExist(database)) {
+            return new SchemaPartitionResp(tsStatus, false, null);
+          }
+        }
       }
     }
 
@@ -335,14 +352,27 @@ public class PartitionManager {
     // Check if the related Databases exist
     for (String database : req.getPartitionSlotsMap().keySet()) {
       if (!isDatabaseExist(database)) {
-        return new DataPartitionResp(
-            new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode())
-                .setMessage(
-                    String.format(
-                        "Create DataPartition failed because the database: %s 
is not exists",
-                        database)),
-            false,
-            null);
+        if (!CONF.isEnableAutoCreateDatabase()) {
+          return new DataPartitionResp(
+              new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode())
+                  .setMessage(
+                      String.format(
+                          "Create DataPartition failed because the database: 
%s is not exists",
+                          database)),
+              false,
+              null);
+        } else {
+          TDatabaseSchema databaseSchema = new TDatabaseSchema(database);
+          ConfigNodeRPCServiceProcessor.validateDatabaseSchema(databaseSchema);
+          DatabaseSchemaPlan databaseSchemaPlan =
+              new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase, 
databaseSchema);
+          TSStatus tsStatus = 
consensusWritePartitionResult(databaseSchemaPlan);
+          LOGGER.info("Auto create database {}: {}", database, tsStatus);
+          if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+              && !isDatabaseExist(database)) {
+            return new DataPartitionResp(tsStatus, false, null);
+          }
+        }
       }
     }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 8103cffd2a4..843065269d8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -285,8 +285,7 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
     return configManager.showVariables();
   }
 
-  @Override
-  public TSStatus setDatabase(TDatabaseSchema databaseSchema) {
+  public static TSStatus validateDatabaseSchema(TDatabaseSchema 
databaseSchema) {
     TSStatus errorResp = null;
     boolean isSystemDatabase = 
databaseSchema.getName().equals(SchemaConstant.SYSTEM_DATABASE);
 
@@ -362,6 +361,15 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
     // The maxRegionGroupNum is equal to the minRegionGroupNum when initialize
     
databaseSchema.setMaxSchemaRegionGroupNum(databaseSchema.getMinSchemaRegionGroupNum());
     
databaseSchema.setMaxDataRegionGroupNum(databaseSchema.getMinDataRegionGroupNum());
+    return errorResp;
+  }
+
+  @Override
+  public TSStatus setDatabase(TDatabaseSchema databaseSchema) {
+    TSStatus errorResp = validateDatabaseSchema(databaseSchema);
+    if (errorResp != null) {
+      return errorResp;
+    }
 
     DatabaseSchemaPlan setPlan =
         new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase, 
databaseSchema);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 0d823abe0e7..200778afc9b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -107,6 +107,8 @@ public class PipeConnectorConstant {
   public static final int CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE = 
16;
   public static final String CONNECTOR_EXTERNAL_USER_NAME_KEY = 
"connector.external.user-name";
   public static final String CONNECTOR_EXTERNAL_USER_NAME_DEFAULT_VALUE = 
"root";
+  public static final String CONNECTOR_EXTERNAL_USER_PASSWORD_KEY = 
"connector.external.password";
+  public static final String CONNECTOR_EXTERNAL_USER_PASSWORD_DEFAULT_VALUE = 
"root";
 
   private PipeConnectorConstant() {
     throw new IllegalStateException("Utility class");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
index a1eead9e6f5..19d62b569b5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.config.constant;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.GB;
+
 public class PipeExtractorConstant {
 
   public static final String EXTRACTOR_KEY = "extractor";
@@ -53,6 +55,13 @@ public class PipeExtractorConstant {
   public static final String EXTRACTOR_REALTIME_MODE_FILE_VALUE = "file";
   public static final String EXTRACTOR_REALTIME_MODE_LOG_VALUE = "log";
   public static final String EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE = 
"forced-log";
+  public static final String EXTRACTOR_LOCAL_SPLIT_ENABLE_KEY = 
"extractor.local-split.enable";
+  public static final String EXTRACTOR_SPLIT_MAX_CONCURRENT_FILE_KEY =
+      "extractor.split.max-concurrent-file";
+  public static final int EXTRACTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE = 
16;
+  public static final String EXTRACTOR_SPLIT_MAX_FILE_BATCH_SIZE_KEY =
+      "extractor.split.max-file-batch-size";
+  public static final long EXTRACTOR_SPLIT_MAX_FILE_BATCH_SIZE_DEFAULT_VALUE = 
4 * GB;
 
   private PipeExtractorConstant() {
     throw new IllegalStateException("Utility class");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 88e1f4d2b92..09c387b51be 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -76,8 +76,11 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.PriorityQueue;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -88,6 +91,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_CONFIG_NODES_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_NAME_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_NAME_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_PASSWORD_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_PASSWORD_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOCAL_SPLIT_ENABLE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_KEY;
@@ -134,6 +139,7 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
   private int splitMaxSize;
   private int maxConcurrentFileNum;
   private String targetUserName;
+  private String targetPassword;
 
   public IoTDBThriftAsyncConnector() {
     if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
@@ -189,6 +195,9 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
       targetUserName =
           parameters.getStringOrDefault(
               CONNECTOR_EXTERNAL_USER_NAME_KEY, 
CONNECTOR_EXTERNAL_USER_NAME_DEFAULT_VALUE);
+      targetPassword =
+          parameters.getStringOrDefault(
+              CONNECTOR_EXTERNAL_USER_PASSWORD_KEY, 
CONNECTOR_EXTERNAL_USER_PASSWORD_DEFAULT_VALUE);
     }
 
     if (isTabletBatchModeEnabled) {
@@ -452,13 +461,29 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
             false,
             splitMaxSize,
             maxConcurrentFileNum,
-            targetUserName);
+            targetUserName,
+            targetPassword);
     splitSender.start();
     LOGGER.info(
-        "Sending {} files to {} complete: {}",
+        "Sending {} files to {} complete",
         pipeTsFileInsertionEvent.getTsFiles().size(),
-        targetConfigNodes,
-        splitSender.getStatistic());
+        targetConfigNodes);
+
+    if (splitSender.getStatistic().hasP2Timeout) {
+      double throughput = splitSender.getStatistic().p2ThroughputMBPS();
+      Map<String, Object> param = new HashMap<>(2);
+      param.put(
+          PipeBatchTsFileInsertionEvent.CONNECTOR_TIMEOUT_MS, 
splitSender.getStatistic().p2Timeout);
+      param.put(PipeBatchTsFileInsertionEvent.CONNECTOR_THROUGHPUT_MBPS_KEY, 
throughput);
+      pipeTsFileInsertionEvent.getExtractorOnConnectorTimeout().apply(param);
+    } else {
+      double throughput = splitSender.getStatistic().p2ThroughputMBPS();
+      pipeTsFileInsertionEvent
+          .getExtractorOnConnectorSuccess()
+          .apply(
+              Collections.singletonMap(
+                  PipeBatchTsFileInsertionEvent.CONNECTOR_THROUGHPUT_MBPS_KEY, 
throughput));
+    }
   }
 
   private void transfer(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java
index 72d8b80a292..6c5b222ca42 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java
@@ -37,7 +37,9 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class PipeBatchTsFileInsertionEvent extends EnrichedEvent
@@ -59,6 +61,17 @@ public class PipeBatchTsFileInsertionEvent extends 
EnrichedEvent
   private final AtomicBoolean[] isClosed;
 
   private TsFileListInsertionDataContainer dataContainer;
+  private long totalSize;
+
+  /**
+   * If the event times out when being transferred, the connector uses this 
call back to inform the
+   * extractor that the task size should be reduced or to start throttling.
+   */
+  private Function<Map<String, Object>, Void> extractorOnConnectorTimeout;
+
+  private Function<Map<String, Object>, Void> extractorOnConnectorSuccess;
+  public static final String CONNECTOR_THROUGHPUT_MBPS_KEY = 
"connector.throughput_MBPS";
+  public static final String CONNECTOR_TIMEOUT_MS = "connector.timeout_MS";
 
   public PipeBatchTsFileInsertionEvent(
       List<TsFileResource> resources, boolean isLoaded, boolean 
isGeneratedByPipe) {
@@ -86,6 +99,9 @@ public class PipeBatchTsFileInsertionEvent extends 
EnrichedEvent
 
     this.resources = resources;
     tsFiles = 
resources.stream().map(TsFileResource::getTsFile).collect(Collectors.toList());
+    for (TsFileResource resource : resources) {
+      totalSize += resource.getTsFileSize();
+    }
     isClosed = new AtomicBoolean[resources.size()];
 
     this.isLoaded = isLoaded;
@@ -242,13 +258,13 @@ public class PipeBatchTsFileInsertionEvent extends 
EnrichedEvent
 
   @Override
   public String toString() {
-    return "PipeTsFileInsertionEvent{"
+    return "PipeBatchTsFileInsertionEvent{"
         + "resources="
         + resources
         + ", tsFiles="
         + tsFiles
-        + ", isClosed="
-        + isClosed
+        + ", totalSize="
+        + totalSize
         + '}';
   }
 
@@ -272,4 +288,22 @@ public class PipeBatchTsFileInsertionEvent extends 
EnrichedEvent
   protected boolean isTsFileResourceCoveredByTimeRange(TsFileResource 
resource) {
     return startTime <= resource.getFileStartTime() && endTime >= 
resource.getFileEndTime();
   }
+
+  public Function<Map<String, Object>, Void> getExtractorOnConnectorTimeout() {
+    return extractorOnConnectorTimeout;
+  }
+
+  public void setExtractorOnConnectorTimeout(
+      Function<Map<String, Object>, Void> extractorOnConnectorTimeout) {
+    this.extractorOnConnectorTimeout = extractorOnConnectorTimeout;
+  }
+
+  public Function<Map<String, Object>, Void> getExtractorOnConnectorSuccess() {
+    return extractorOnConnectorSuccess;
+  }
+
+  public void setExtractorOnConnectorSuccess(
+      Function<Map<String, Object>, Void> extractorOnConnectorSuccess) {
+    this.extractorOnConnectorSuccess = extractorOnConnectorSuccess;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index 927858421d5..94ada46cd08 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -49,11 +49,9 @@ import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOCAL_SPLIT_ENABLE_KEY;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_LOCAL_SPLIT_ENABLE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE_VALUE;
@@ -61,6 +59,10 @@ import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXT
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_SPLIT_MAX_CONCURRENT_FILE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_SPLIT_MAX_FILE_BATCH_SIZE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_SPLIT_MAX_FILE_BATCH_SIZE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
@@ -133,13 +135,18 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
 
   private void constructHistoricalExtractor(PipeParameters parameters) {
     // Enable historical extractor by default
-    if (parameters.getBooleanOrDefault(CONNECTOR_LOCAL_SPLIT_ENABLE_KEY, 
false)) {
+    if (parameters.getBooleanOrDefault(EXTRACTOR_LOCAL_SPLIT_ENABLE_KEY, 
false)) {
+      LOGGER.info("Use batched extractor.");
       historicalExtractor =
           new BatchedTsFileExtractor(
               parameters.getIntOrDefault(
-                  CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_KEY,
-                  CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE));
+                  EXTRACTOR_SPLIT_MAX_CONCURRENT_FILE_KEY,
+                  EXTRACTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE),
+              parameters.getLongOrDefault(
+                  EXTRACTOR_SPLIT_MAX_FILE_BATCH_SIZE_KEY,
+                  EXTRACTOR_SPLIT_MAX_FILE_BATCH_SIZE_DEFAULT_VALUE));
     } else {
+      LOGGER.info("Use single extractor.");
       historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java
index 3a5deeda141..4ed5e894b53 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java
@@ -29,6 +29,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
 
 /**
  * Similar to the base class, but it batches several files as an event to 
enable further
@@ -38,9 +41,13 @@ public class BatchedTsFileExtractor extends 
PipeHistoricalDataRegionTsFileExtrac
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BatchedTsFileExtractor.class);
   private int maxBatchSize;
+  private long maxFileTotalSize;
+  private ThroughputMonitor throughputMonitor;
 
-  public BatchedTsFileExtractor(int maxBatchSize) {
+  public BatchedTsFileExtractor(int maxBatchSize, long maxFileTotalSize) {
     this.maxBatchSize = Math.max(1, maxBatchSize);
+    this.maxFileTotalSize = maxFileTotalSize;
+    this.throughputMonitor = new ThroughputMonitor();
   }
 
   @Override
@@ -52,10 +59,16 @@ public class BatchedTsFileExtractor extends 
PipeHistoricalDataRegionTsFileExtrac
     if (resource == null) {
       return null;
     }
+    long totalFileSize = 0;
     List<TsFileResource> tsFileResourceList = new ArrayList<>(maxBatchSize);
     tsFileResourceList.add(resource);
-    while (!pendingQueue.isEmpty() && tsFileResourceList.size() < 
maxBatchSize) {
-      tsFileResourceList.add(resource);
+    totalFileSize = resource.getTsFileSize();
+    while (!pendingQueue.isEmpty()
+        && tsFileResourceList.size() < maxBatchSize
+        && totalFileSize < maxFileTotalSize) {
+      TsFileResource poll = pendingQueue.poll();
+      tsFileResourceList.add(poll);
+      totalFileSize += poll.getTsFileSize();
     }
 
     final PipeBatchTsFileInsertionEvent event =
@@ -83,6 +96,35 @@ public class BatchedTsFileExtractor extends 
PipeHistoricalDataRegionTsFileExtrac
       }
     }
 
+    event.setExtractorOnConnectorTimeout(this::onConnectorTimeout);
+    event.setExtractorOnConnectorSuccess(this::onConnectorSuccess);
+    LOGGER.info("Generated a TsFile event: {}", event);
     return event;
   }
+
+  public Void onConnectorTimeout(Map<String, Object> parameters) {
+    long timeout = (long) 
parameters.get(PipeBatchTsFileInsertionEvent.CONNECTOR_TIMEOUT_MS);
+    double throughput =
+        (double) 
parameters.get(PipeBatchTsFileInsertionEvent.CONNECTOR_THROUGHPUT_MBPS_KEY);
+    throughputMonitor.record(System.currentTimeMillis(), throughput);
+    double avg = throughputMonitor.calculateAverage();
+    LOGGER.info(
+        "Connector timed out: throughput={}MB/s, timeout={}ms, 
estimatedThroughput={}MB/s",
+        throughput,
+        timeout,
+        avg);
+    if (!Double.isNaN(avg)) {
+      maxFileTotalSize = (long) (timeout / 1000.0 * avg * 0.9 * MB);
+      LOGGER.info("Connector timed out: newMaxFileSize={}", maxFileTotalSize);
+    }
+    return null;
+  }
+
+  public Void onConnectorSuccess(Map<String, Object> parameters) {
+    double throughput =
+        (double) 
parameters.get(PipeBatchTsFileInsertionEvent.CONNECTOR_THROUGHPUT_MBPS_KEY);
+    LOGGER.info("Connector succeeds with throughput: {}MB/s", throughput);
+    throughputMonitor.record(System.currentTimeMillis(), throughput);
+    return null;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/ThroughputMonitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/ThroughputMonitor.java
new file mode 100644
index 00000000000..792e0c467a3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/ThroughputMonitor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.pipe.extractor.historical;
+
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+
+public class ThroughputMonitor {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ThroughputMonitor.class);
+  private int maxRecordNum = 10000;
+  private int minRecordNumToAnalyze = 5;
+  /** <timestamp, throughput> */
+  private final Queue<Pair<Long, Double>> history = new 
ArrayDeque<>(maxRecordNum);
+
+  public void record(long timestamp, double throughput) {
+    synchronized (history) {
+      if (history.size() >= maxRecordNum) {
+        history.remove();
+      }
+      history.add(new Pair<>(timestamp, throughput));
+    }
+  }
+
+  public double calculateAverage() {
+    if (history.size() < minRecordNumToAnalyze) {
+      logger.info("Only {} histories, do not update", history.size());
+      return Double.NaN;
+    }
+    List<Pair<Long, Double>> historyCopy;
+    synchronized (history) {
+      historyCopy = new ArrayList<>(history);
+    }
+    logger.info("{} histories before purging", historyCopy.size());
+    historyCopy = purge(historyCopy);
+    logger.info("{} histories after purging", historyCopy.size());
+    if (historyCopy.size() < minRecordNumToAnalyze) {
+      return Double.NaN;
+    }
+    return historyCopy.stream().mapToDouble(p -> 
p.right).average().orElse(Double.NaN);
+  }
+
+  private List<Pair<Long, Double>> purge(List<Pair<Long, Double>> history) {
+    history = purgeByTimeSpan(history);
+    if (history.size() < minRecordNumToAnalyze) {
+      return history;
+    }
+    return purgeByStderr(history);
+  }
+
+  private List<Pair<Long, Double>> purgeByStderr(List<Pair<Long, Double>> 
history) {
+    double sum = 0.0;
+    for (Pair<Long, Double> longDoublePair : history) {
+      sum += longDoublePair.right;
+    }
+
+    while (true) {
+      double average = sum / history.size();
+      double stdErrSum = 0.0;
+      int largestStdErrIdx = 0;
+      double largestStdErr = 0.0;
+      for (int i = 0; i < history.size(); i++) {
+        Pair<Long, Double> longDoublePair = history.get(i);
+        Double throughput = longDoublePair.right;
+        double stdErr = Math.sqrt((throughput - average) * (throughput - 
average));
+        stdErrSum += stdErr;
+        if (stdErr > largestStdErr) {
+          largestStdErrIdx = i;
+          largestStdErr = stdErr;
+        }
+      }
+      double stdErrAvg = stdErrSum / history.size();
+
+      if (largestStdErr > stdErrAvg * 3.0) {
+        history.remove(largestStdErrIdx);
+        if (history.size() < minRecordNumToAnalyze) {
+          return history;
+        }
+      } else {
+        break;
+      }
+    }
+
+    return history;
+  }
+
+  private List<Pair<Long, Double>> purgeByTimeSpan(List<Pair<Long, Double>> 
history) {
+    long maxTimeSpan = 3600 * 1000L;
+    long currTime = System.currentTimeMillis();
+    long timeThreshold = currTime - maxTimeSpan;
+    int i = 0;
+    for (; i < history.size(); i++) {
+      if (history.get(i).left >= timeThreshold) {
+        break;
+      }
+    }
+    if (i > 0) {
+      history = history.subList(i, history.size());
+    }
+    return history;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 05119db0ca2..3b2bcb4cbcf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.utils.ErrorHandlingUtils;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileBatchInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -122,6 +123,9 @@ public class PipeConnectorSubtask extends PipeSubtask {
       } else if (event instanceof TsFileInsertionEvent) {
         outputPipeConnector.transfer((TsFileInsertionEvent) event);
         PipeConnectorMetrics.getInstance().markTsFileEvent(taskID);
+      } else if (event instanceof TsFileBatchInsertionEvent) {
+        outputPipeConnector.transfer((TsFileBatchInsertionEvent) event);
+        PipeConnectorMetrics.getInstance().markTsFileEvent(taskID);
       } else if (event instanceof PipeHeartbeatEvent) {
         try {
           outputPipeConnector.heartbeat();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 29fd5c4a8a7..e52f8ff0a63 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
 import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
@@ -1089,4 +1090,9 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
           .orElse(false);
     }
   }
+
+  @FunctionalInterface
+  public interface ConfigNodeClientProvider {
+    ConfigNodeClient supply() throws ClientManagerException, TException;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/ExternalClientSession.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/ExternalClientSession.java
new file mode 100644
index 00000000000..701945ecf8e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/ExternalClientSession.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.protocol.session;
+
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/** Client session from another IoTDB cluster, such as the other end of a 
pipe. */
+public class ExternalClientSession extends IClientSession {
+
+  // Pipe task UUId
+  private final String clientID;
+
+  private final Map<Long, Set<Long>> statementIdToQueryId = new 
ConcurrentHashMap<>();
+
+  public ExternalClientSession(String clientID) {
+    this.clientID = clientID;
+  }
+
+  @Override
+  public String getClientAddress() {
+    return clientID;
+  }
+
+  @Override
+  public int getClientPort() {
+    return 0;
+  }
+
+  @Override
+  TSConnectionType getConnectionType() {
+    return TSConnectionType.INTERNAL;
+  }
+
+  @Override
+  String getConnectionId() {
+    return clientID;
+  }
+
+  @Override
+  public Set<Long> getStatementIds() {
+    return statementIdToQueryId.keySet();
+  }
+
+  @Override
+  public void addStatementId(long statementId) {
+    statementIdToQueryId.computeIfAbsent(statementId, sid -> new 
CopyOnWriteArraySet<>());
+  }
+
+  @Override
+  public Set<Long> removeStatementId(long statementId) {
+    return statementIdToQueryId.remove(statementId);
+  }
+
+  @Override
+  public void addQueryId(Long statementId, long queryId) {
+    Set<Long> queryIds = statementIdToQueryId.get(statementId);
+    if (queryIds == null) {
+      throw new IllegalStateException(
+          "StatementId: " + statementId + "doesn't exist in this session " + 
this);
+    }
+    queryIds.add(queryId);
+  }
+
+  @Override
+  public void removeQueryId(Long statementId, Long queryId) {
+    Set<Long> queryIds = statementIdToQueryId.get(statementId);
+    if (queryIds != null) {
+      queryIds.remove(queryId);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
index eafec00cd8d..8e1a9485d8f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
@@ -308,9 +308,11 @@ public class SessionManager implements SessionManagerMBean 
{
    */
   public void removeCurrSession() {
     IClientSession session = currSession.get();
-    sessions.remove(session);
-    currSession.remove();
-    currSessionIdleTime.remove();
+    if (session != null) {
+      sessions.remove(session);
+      currSession.remove();
+      currSessionIdleTime.remove();
+    }
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index a2a6289f74f..68b21458c26 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -24,11 +24,14 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TSettleReq;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -61,11 +64,14 @@ import 
org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.protocol.session.ExternalClientSession;
 import org.apache.iotdb.db.protocol.session.IClientSession;
 import org.apache.iotdb.db.protocol.session.InternalClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.protocol.thrift.OperationType;
+import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import 
org.apache.iotdb.db.queryengine.execution.executor.RegionExecutionResult;
 import org.apache.iotdb.db.queryengine.execution.executor.RegionReadExecutor;
 import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
@@ -73,6 +79,10 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceFailur
 import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
+import org.apache.iotdb.db.queryengine.execution.load.AlignedChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.NonAlignedChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
 import 
org.apache.iotdb.db.queryengine.execution.operator.schema.source.ISchemaSource;
 import 
org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSourceFactory;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
@@ -82,6 +92,8 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaC
 import 
org.apache.iotdb.db.queryengine.plan.analyze.partition.ClusterPartitionFetcher;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator.ValidatingSchema;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
@@ -204,9 +216,13 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.trigger.api.enums.FailureStrategy;
 import org.apache.iotdb.trigger.api.enums.TriggerEvent;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.TsFileUtils;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import com.google.common.collect.ImmutableList;
@@ -218,6 +234,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -384,27 +401,151 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     throw new UnsupportedOperationException();
   }
 
+  private ValidatingSchema extractValidatingSchema(LoadTsFilePieceNode 
pieceNode)
+      throws IllegalPathException {
+    ValidatingSchema validatingSchema = new ValidatingSchema();
+    String currDevice = null;
+    String currMeasurement = null;
+    List<String> measurements = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    List<TSEncoding> encodings = new ArrayList<>();
+    List<CompressionType> compressionTypes = new ArrayList<>();
+    boolean isAligned = false;
+    for (TsFileData data : pieceNode.getAllTsFileData()) {
+      if (data.isModification()) {
+        continue;
+      }
+
+      ChunkData chunkData = (ChunkData) data;
+      String device = chunkData.getDevice();
+      String measurement = chunkData.firstMeasurement();
+      if (currDevice == null) {
+        currDevice = device;
+        currMeasurement = measurement;
+      } else if (!currDevice.equals(device)) {
+        validatingSchema.devicePaths.add(new PartialPath(currDevice));
+        validatingSchema.measurements.add(measurements.toArray(new String[0]));
+        validatingSchema.dataTypes.add(dataTypes.toArray(new TSDataType[0]));
+        validatingSchema.encodings.add(encodings.toArray(new TSEncoding[0]));
+        validatingSchema.compressionTypes.add(compressionTypes.toArray(new 
CompressionType[0]));
+        validatingSchema.isAlignedList.add(isAligned);
+        currDevice = device;
+        currMeasurement = measurement;
+        measurements.clear();
+        ;
+        dataTypes.clear();
+        encodings.clear();
+        compressionTypes.clear();
+      } else if (currMeasurement.equals(measurement)) {
+        continue;
+      }
+
+      if (chunkData.isAligned()) {
+        AlignedChunkData alignedChunkData = (AlignedChunkData) chunkData;
+        for (ChunkHeader header : alignedChunkData.getChunkHeaderList()) {
+          measurements.add(header.getMeasurementID());
+          dataTypes.add(header.getDataType());
+          encodings.add(header.getEncodingType());
+          compressionTypes.add(header.getCompressionType());
+        }
+        isAligned = true;
+      } else {
+        NonAlignedChunkData nonAlignedChunkData = (NonAlignedChunkData) 
chunkData;
+        ChunkHeader header = nonAlignedChunkData.getChunkHeader();
+        measurements.add(header.getMeasurementID());
+        dataTypes.add(header.getDataType());
+        encodings.add(header.getEncodingType());
+        compressionTypes.add(header.getCompressionType());
+        isAligned = false;
+      }
+    }
+
+    validatingSchema.devicePaths.add(new PartialPath(currDevice));
+    validatingSchema.measurements.add(measurements.toArray(new String[0]));
+    validatingSchema.dataTypes.add(dataTypes.toArray(new TSDataType[0]));
+    validatingSchema.encodings.add(encodings.toArray(new TSEncoding[0]));
+    validatingSchema.compressionTypes.add(compressionTypes.toArray(new 
CompressionType[0]));
+    validatingSchema.isAlignedList.add(isAligned);
+    return validatingSchema;
+  }
+
   @Override
-  public TLoadResp sendTsFilePieceNode(TTsFilePieceReq req) {
-    LOGGER.info("Receive load node from uuid {}.", req.uuid);
+  public TLoadResp sendTsFilePieceNode(TTsFilePieceReq req) throws TException {
+    LOGGER.debug("Receive load node from uuid {} of {}.", req.uuid, 
req.consensusGroupId);
 
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
-    LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) 
PlanNodeType.deserialize(req.body);
-    if (pieceNode == null) {
-      return createTLoadResp(
-          new 
TSStatus(TSStatusCode.DESERIALIZE_PIECE_OF_TSFILE_ERROR.getStatusCode()));
-    }
 
-    TSStatus resultStatus =
-        StorageEngine.getInstance()
-            .writeLoadTsFileNode((DataRegionId) groupId, pieceNode, req.uuid);
+    IClientSession session = new ExternalClientSession(req.uuid);
+    try {
+      ByteBuffer buffer =
+          req.isSetCompressionType()
+              ? TsFileUtils.uncompress(
+                  req.uncompressedLength,
+                  CompressionType.deserialize(req.compressionType),
+                  req.body)
+              : req.body.slice();
+      LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) 
PlanNodeType.deserialize(buffer);
+      if (pieceNode == null) {
+        return createTLoadResp(
+            new 
TSStatus(TSStatusCode.DESERIALIZE_PIECE_OF_TSFILE_ERROR.getStatusCode()));
+      }
+
+      if (req.needSchemaRegistration
+          && 
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+        TSStatus status = AuthorityChecker.checkUser(req.getUsername(), 
req.getPassword());
+        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          return createTLoadResp(status);
+        }
 
-    return createTLoadResp(resultStatus);
+        try {
+          ValidatingSchema validatingSchema = 
extractValidatingSchema(pieceNode);
+          SESSION_MANAGER.registerSession(session);
+          SESSION_MANAGER.supplySession(
+              session, req.getUsername(), ZoneId.systemDefault().getId(), 
ClientVersion.V_1_0);
+
+          MPPQueryContext queryContext =
+              new MPPQueryContext(
+                  "",
+                  COORDINATOR.createQueryId(),
+                  SESSION_MANAGER.getSessionInfo(session),
+                  DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT,
+                  DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT);
+          SchemaValidator.validate(schemaFetcher, validatingSchema, 
queryContext);
+        } catch (IllegalPathException e) {
+          throw new TException(e);
+        }
+      }
+
+      TSStatus resultStatus =
+          StorageEngine.getInstance()
+              .writeLoadTsFileNode((DataRegionId) groupId, pieceNode, 
req.uuid);
+
+      if (req.relayTargets != null) {
+        TRegionReplicaSet replicaSet = req.relayTargets;
+        req.relayTargets = null;
+        req.needSchemaRegistration = false;
+        for (TDataNodeLocation dataNodeLocation : 
replicaSet.getDataNodeLocations()) {
+          TEndPoint internalEndPoint = dataNodeLocation.getInternalEndPoint();
+          try (SyncDataNodeInternalServiceClient client =
+              
COORDINATOR.getInternalServiceClientManager().borrowClient(internalEndPoint)) {
+            client.sendTsFilePieceNode(req);
+          }
+        }
+      }
+      return createTLoadResp(resultStatus);
+    } catch (ClientManagerException | IOException e) {
+      throw new TException(e);
+    } finally {
+      SESSION_MANAGER.closeSession(session, 
COORDINATOR::cleanupQueryExecution);
+      SESSION_MANAGER.removeCurrSession();
+    }
   }
 
   @Override
   public TLoadResp sendLoadCommand(TLoadCommandReq req) {
+    LOGGER.info("Receive load command for {}", req.getUuid());
+
     return createTLoadResp(
         StorageEngine.getInstance()
             .executeLoadCommand(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java
index 307a7e9d9e5..33317ffb895 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java
@@ -121,7 +121,11 @@ public class QueryId {
 
     for (int i = 0; i < id.length(); i++) {
       char c = id.charAt(i);
-      if (!(c == '_' || c >= 'a' && c <= 'z' || c >= '0' && c <= '9')) {
+      if (!(c == '_'
+          || c >= 'A' && c <= 'Z'
+          || c >= 'a' && c <= 'z'
+          || c >= '0' && c <= '9'
+          || c == '-')) {
         return false;
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 5baf1de9314..04b8fdb7c6b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -50,6 +50,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -76,6 +77,7 @@ public class LoadTsFileManager {
 
   private final ScheduledExecutorService cleanupExecutors;
   private final Map<String, ScheduledFuture<?>> uuid2Future;
+  private final Set<String> completedTasks;
 
   public LoadTsFileManager() {
     this.loadDir = 
SystemFileFactory.INSTANCE.getFile(CONFIG.getLoadTsFileDir());
@@ -83,6 +85,7 @@ public class LoadTsFileManager {
     this.cleanupExecutors =
         IoTDBThreadPoolFactory.newScheduledThreadPool(1, 
LoadTsFileManager.class.getName());
     this.uuid2Future = new ConcurrentHashMap<>();
+    this.completedTasks = new ConcurrentSkipListSet<>();
 
     recover();
   }
@@ -137,16 +140,24 @@ public class LoadTsFileManager {
 
   public boolean loadAll(String uuid, boolean isGeneratedByPipe)
       throws IOException, LoadFileException {
-    if (!uuid2WriterManager.containsKey(uuid)) {
+    TsFileWriterManager tsFileWriterManager = uuid2WriterManager.get(uuid);
+    if (completedTasks.contains(uuid)) {
+      return true;
+    }
+    if (tsFileWriterManager == null) {
       return false;
     }
-    uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe);
+    tsFileWriterManager.loadAll(isGeneratedByPipe);
     clean(uuid);
     return true;
   }
 
   public boolean deleteAll(String uuid) {
-    if (!uuid2WriterManager.containsKey(uuid)) {
+    TsFileWriterManager tsFileWriterManager = uuid2WriterManager.get(uuid);
+    if (completedTasks.contains(uuid)) {
+      return true;
+    }
+    if (tsFileWriterManager == null) {
       return false;
     }
     clean(uuid);
@@ -154,10 +165,17 @@ public class LoadTsFileManager {
   }
 
   private void clean(String uuid) {
-    uuid2WriterManager.get(uuid).close();
-    uuid2WriterManager.remove(uuid);
-    uuid2Future.get(uuid).cancel(true);
-    uuid2Future.remove(uuid);
+    // record completed tasks for second phase retry
+    completedTasks.add(uuid);
+
+    TsFileWriterManager writerManager = uuid2WriterManager.remove(uuid);
+    if (writerManager != null) {
+      writerManager.close();
+    }
+    ScheduledFuture<?> future = uuid2Future.remove(uuid);
+    if (future != null) {
+      future.cancel(true);
+    }
 
     final Path loadDirPath = loadDir.toPath();
     if (!Files.exists(loadDirPath)) {
@@ -267,20 +285,25 @@ public class LoadTsFileManager {
       receivedSplitIds.add(deletionData.getSplitId());
     }
 
-    private void loadAll(boolean isGeneratedByPipe) throws IOException, 
LoadFileException {
+    private synchronized void loadAll(boolean isGeneratedByPipe)
+        throws IOException, LoadFileException {
       if (isClosed) {
-        throw new 
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
+        // this command is a retry and the previous command has loaded the task
+        // treat this one as a success
+        return;
       }
       for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : 
dataPartition2Writer.entrySet()) {
         TsFileIOWriter writer = entry.getValue();
         if (writer.isWritingChunkGroup()) {
           writer.endChunkGroup();
         }
-        writer.endFile();
-        entry
-            .getKey()
-            .getDataRegion()
-            .loadNewTsFile(generateResource(writer), true, isGeneratedByPipe);
+        if (writer.canWrite()) {
+          writer.endFile();
+          entry
+              .getKey()
+              .getDataRegion()
+              .loadNewTsFile(generateResource(writer), true, 
isGeneratedByPipe);
+        }
       }
     }
 
@@ -290,7 +313,7 @@ public class LoadTsFileManager {
       return tsFileResource;
     }
 
-    private void close() {
+    private synchronized void close() {
       if (isClosed) {
         return;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
index d6f0ac87e79..50686d37b8d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
@@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -66,9 +67,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
 import static 
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileDispatcherImpl.NODE_CONNECTION_ERROR;
 
 public class TsFileSplitSender {
@@ -84,13 +87,13 @@ public class TsFileSplitSender {
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       internalServiceClientManager;
   // All consensus groups accessed in Phase1 should be notified in Phase2
-  private final Set<TRegionReplicaSet> allReplicaSets = new 
ConcurrentSkipListSet<>();
+  private final Set<TDataNodeLocation> allReplicaSets = new 
ConcurrentSkipListSet<>();
   private String uuid;
   private LocationStatistics locationStatistics = new LocationStatistics();
   private boolean isGeneratedByPipe;
   private Map<Pair<LoadTsFilePieceNode, TRegionReplicaSet>, Exception> 
phaseOneFailures =
       new ConcurrentHashMap<>();
-  private Map<TRegionReplicaSet, Exception> phaseTwoFailures = new HashMap<>();
+  private Map<TDataNodeLocation, Exception> phaseTwoFailures = new HashMap<>();
   private long maxSplitSize;
   private PieceNodeSplitter pieceNodeSplitter = new 
ClusteringMeasurementSplitter(1.0, 10);
   //        private PieceNodeSplitter pieceNodeSplitter = new 
OrderedMeasurementSplitter();
@@ -100,6 +103,7 @@ public class TsFileSplitSender {
   private Queue<Pair<Future<List<LoadTsFilePieceNode>>, TRegionReplicaSet>> 
splitFutures;
   private int maxConcurrentFileNum;
   private String userName;
+  private String password;
 
   public TsFileSplitSender(
       LoadTsFileNode loadTsFileNode,
@@ -109,7 +113,8 @@ public class TsFileSplitSender {
       boolean isGeneratedByPipe,
       long maxSplitSize,
       int maxConcurrentFileNum,
-      String userName) {
+      String userName,
+      String password) {
     this.loadTsFileNode = loadTsFileNode;
     this.targetPartitionFetcher = targetPartitionFetcher;
     this.targetPartitionInterval = targetPartitionInterval;
@@ -120,6 +125,9 @@ public class TsFileSplitSender {
     this.splitFutures = new ArrayDeque<>(MAX_PENDING_PIECE_NODE);
     this.maxConcurrentFileNum = maxConcurrentFileNum;
     this.userName = userName;
+    this.password = password;
+
+    this.statistic.totalSize = loadTsFileNode.getTotalSize();
   }
 
   public void start() throws IOException {
@@ -127,6 +135,7 @@ public class TsFileSplitSender {
     // skip files without data
     loadTsFileNode.getResources().removeIf(f -> f.getDevices().isEmpty());
     uuid = UUID.randomUUID().toString();
+    logger.info("Start to split {}", loadTsFileNode);
 
     boolean isFirstPhaseSuccess = firstPhase(loadTsFileNode);
     boolean isSecondPhaseSuccess = secondPhase(isFirstPhaseSuccess);
@@ -176,58 +185,79 @@ public class TsFileSplitSender {
         tsFileDataManager.sendAllTsFileData()
             && processRemainingPieceNodes()
             && phaseOneFailures.isEmpty();
-    logger.info("Cleanup ends after {}ms", System.currentTimeMillis() - start);
+    statistic.p1TimeMs = System.currentTimeMillis() - start;
+    logger.info("Cleanup ends after {}ms", statistic.p1TimeMs);
     return success;
   }
 
   private boolean secondPhase(boolean isFirstPhaseSuccess) {
-    logger.info("Start dispatching Load command for uuid {}", uuid);
     TLoadCommandReq loadCommandReq =
         new TLoadCommandReq(
             (isFirstPhaseSuccess ? LoadCommand.EXECUTE : 
LoadCommand.ROLLBACK).ordinal(), uuid);
     loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
 
-    for (TRegionReplicaSet replicaSet : allReplicaSets) {
-      loadCommandReq.setUseConsensus(true);
-      for (TDataNodeLocation dataNodeLocation : 
replicaSet.getDataNodeLocations()) {
-        TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
-        Exception groupException = null;
-        loadCommandReq.setConsensusGroupId(replicaSet.getRegionId());
-
-        for (int i = 0; i < MAX_RETRY; i++) {
-          try (SyncDataNodeInternalServiceClient client =
-              internalServiceClientManager.borrowClient(endPoint)) {
-            TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
-            if (!loadResp.isAccepted()) {
-              logger.warn(loadResp.message);
-              groupException = new 
FragmentInstanceDispatchException(loadResp.status);
-            }
-            break;
-          } catch (ClientManagerException | TException e) {
-            logger.warn(NODE_CONNECTION_ERROR, endPoint, e);
-            TSStatus status = new TSStatus();
-            status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
-            status.setMessage(
-                "can't connect to node {}, please reset longer 
dn_connection_timeout_ms "
-                    + "in iotdb-common.properties and restart iotdb."
-                    + endPoint);
-            groupException = new FragmentInstanceDispatchException(status);
-          }
-          try {
-            Thread.sleep(RETRY_INTERVAL_MS);
-          } catch (InterruptedException e) {
-            groupException = e;
-            break;
-          }
-        }
-
-        if (groupException != null) {
-          phaseTwoFailures.put(replicaSet, groupException);
-        } else {
-          break;
-        }
+    long p2StartMS = System.currentTimeMillis();
+    List<Pair<TDataNodeLocation, Future<Void>>> loadFutures = new 
ArrayList<>();
+    AtomicBoolean hasTimeout = new AtomicBoolean();
+    for (TDataNodeLocation dataNodeLocation : allReplicaSets) {
+      loadFutures.add(
+          new Pair<>(
+              dataNodeLocation,
+              splitNodeService.submit(
+                  () -> {
+                    logger.info(
+                        "Start dispatching Load command for uuid {} to {}", 
uuid, dataNodeLocation);
+                    TEndPoint endPoint = 
dataNodeLocation.getInternalEndPoint();
+                    Exception locationException = null;
+
+                    for (int i = 0; i < MAX_RETRY; i++) {
+                      long timeout = 0;
+                      try (SyncDataNodeInternalServiceClient client =
+                          internalServiceClientManager.borrowClient(endPoint)) 
{
+                        timeout = client.getTimeout();
+                        TLoadResp loadResp = 
client.sendLoadCommand(loadCommandReq);
+                        if (!loadResp.isAccepted()) {
+                          logger.warn(loadResp.message);
+                          locationException =
+                              new 
FragmentInstanceDispatchException(loadResp.status);
+                        } else {
+                          locationException = null;
+                        }
+                        break;
+                      } catch (ClientManagerException | TException e) {
+                        TSStatus status = new TSStatus();
+                        
status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+                        status.setMessage(
+                            "can't connect to node {}, please reset longer 
dn_connection_timeout_ms "
+                                + "in iotdb-common.properties and restart 
iotdb."
+                                + endPoint);
+                        locationException = new 
FragmentInstanceDispatchException(status);
+                        hasTimeout.set(true);
+                        statistic.p2Timeout = timeout;
+                      }
+                      try {
+                        Thread.sleep(RETRY_INTERVAL_MS);
+                      } catch (InterruptedException e) {
+                        locationException = e;
+                        break;
+                      }
+                    }
+
+                    if (locationException != null) {
+                      phaseTwoFailures.put(dataNodeLocation, 
locationException);
+                    }
+                    return null;
+                  })));
+    }
+    for (Pair<TDataNodeLocation, Future<Void>> loadFuture : loadFutures) {
+      try {
+        loadFuture.right.get();
+      } catch (InterruptedException | ExecutionException e) {
+        phaseTwoFailures.put(loadFuture.left, e);
       }
     }
+    statistic.p2TimeMS = System.currentTimeMillis() - p2StartMS;
+    statistic.hasP2Timeout = hasTimeout.get();
 
     return phaseTwoFailures.isEmpty();
   }
@@ -280,10 +310,6 @@ public class TsFileSplitSender {
 
   private boolean dispatchPieceNodes(
       List<LoadTsFilePieceNode> subNodes, TRegionReplicaSet replicaSet) {
-    AtomicLong minDispatchTime = new AtomicLong(Long.MAX_VALUE);
-    AtomicLong maxDispatchTime = new AtomicLong(Long.MIN_VALUE);
-    AtomicLong sumDispatchTime = new AtomicLong();
-    AtomicLong sumCompressingTime = new AtomicLong();
 
     long start = System.nanoTime();
     List<Boolean> subNodeResults =
@@ -303,12 +329,12 @@ public class TsFileSplitSender {
                     return false;
                   }
                   long compressingTime = System.nanoTime() - startTime;
-                  sumCompressingTime.addAndGet(compressingTime);
-                  statistic.compressingTime.addAndGet(compressingTime);
+                  statistic.compressingTimeNs.addAndGet(compressingTime);
 
                   TTsFilePieceReq loadTsFileReq =
                       new TTsFilePieceReq(buffer, uuid, 
replicaSet.getRegionId());
-                  loadTsFileReq.isRelay = true;
+                  loadTsFileReq.setUsername(userName);
+                  loadTsFileReq.setPassword(password);
                   
loadTsFileReq.setCompressionType(compressionType.serialize());
                   loadTsFileReq.setUncompressedLength(uncompressedLength);
                   LocationSequencer locationSequencer = 
createLocationSequencer(replicaSet);
@@ -317,6 +343,11 @@ public class TsFileSplitSender {
                   Exception lastConnectionError = null;
                   TDataNodeLocation currLocation = null;
                   for (TDataNodeLocation location : locationSequencer) {
+                    TRegionReplicaSet relaySet = new 
TRegionReplicaSet(replicaSet);
+                    relaySet.getDataNodeLocations().remove(location);
+                    loadTsFileReq.setRelayTargets(relaySet);
+                    loadTsFileReq.setNeedSchemaRegistration(true);
+
                     if (location.getDataNodeId() == 0 && 
logger.isDebugEnabled()) {
                       locationStatistics.logLocationStatistics();
                       logger.info("Chose location {}", 
location.getDataNodeId());
@@ -370,52 +401,22 @@ public class TsFileSplitSender {
                   locationStatistics.updateThroughput(
                       currLocation, node.getDataSize(), timeConsumption);
 
-                  synchronized (maxDispatchTime) {
-                    if (maxDispatchTime.get() < timeConsumption) {
-                      maxDispatchTime.set(timeConsumption);
-                    }
-                  }
-                  synchronized (minDispatchTime) {
-                    if (minDispatchTime.get() > timeConsumption) {
-                      minDispatchTime.set(timeConsumption);
-                    }
-                  }
-                  sumDispatchTime.addAndGet(timeConsumption);
                   return true;
                 })
             .collect(Collectors.toList());
     long elapsedTime = System.nanoTime() - start;
-    statistic.dispatchNodesTime.addAndGet(elapsedTime);
-    logger.debug(
-        "Dispatched one node after {}ms, min/maxDispatching time: {}/{}ns, 
avg: {}ns, sum: {}ns, compressing: {}ns",
-        elapsedTime / 1_000_000L,
-        minDispatchTime.get(),
-        maxDispatchTime.get(),
-        sumDispatchTime.get() / subNodes.size(),
-        sumDispatchTime.get(),
-        sumCompressingTime.get());
+    statistic.dispatchNodesTimeNS.addAndGet(elapsedTime);
     return !subNodeResults.contains(false);
   }
 
   public boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode, 
TRegionReplicaSet replicaSet) {
     long allStart = System.nanoTime();
-    allReplicaSets.add(replicaSet);
-    if (false) {
-      long start = System.nanoTime();
-      List<LoadTsFilePieceNode> split = pieceNodeSplitter.split(pieceNode);
-      long elapsedTime = System.nanoTime() - start;
-      statistic.splitTime.addAndGet(elapsedTime);
-      statistic.pieceNodeNum.incrementAndGet();
-      logger.debug("{} splits are generated after {}ms", split.size(), 
elapsedTime / 1_000_000L);
-      boolean success = dispatchPieceNodes(split, replicaSet);
-      statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
-      return success;
-    }
+    allReplicaSets.addAll(replicaSet.dataNodeLocations);
 
     List<LoadTsFilePieceNode> subNodes;
     if (splitFutures.size() < MAX_PENDING_PIECE_NODE) {
       splitFutures.add(new Pair<>(submitSplitPieceNode(pieceNode), 
replicaSet));
-      statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
+      statistic.dispatchNodeTimeNS.addAndGet(System.nanoTime() - allStart);
       return true;
     } else {
       long start = System.nanoTime();
@@ -434,7 +435,7 @@ public class TsFileSplitSender {
         return false;
       }
       boolean success = dispatchPieceNodes(subNodes, pair.right);
-      statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
+      statistic.dispatchNodeTimeNS.addAndGet(System.nanoTime() - allStart);
       return success;
     }
   }
@@ -447,24 +448,42 @@ public class TsFileSplitSender {
     public AtomicLong compressedSize = new AtomicLong();
     public AtomicLong splitTime = new AtomicLong();
     public AtomicLong pieceNodeNum = new AtomicLong();
-    public AtomicLong dispatchNodesTime = new AtomicLong();
-    public AtomicLong dispatchNodeTime = new AtomicLong();
-    public AtomicLong compressingTime = new AtomicLong();
+    public AtomicLong dispatchNodesTimeNS = new AtomicLong();
+    public AtomicLong dispatchNodeTimeNS = new AtomicLong();
+    public AtomicLong compressingTimeNs = new AtomicLong();
+    public long p1TimeMs;
+    public long p2TimeMS;
+    public long totalSize;
+    public boolean hasP2Timeout;
+    public long p2Timeout;
 
     public void logStatistic() {
-      logger.info("Time consumption: {}ms", taskEndTime - taskStartTime);
+      logger.info(
+          "Time consumption: {}ms, totalSize: {}MB",
+          taskEndTime - taskStartTime,
+          totalSize * 1.0 / MB);
       logger.info(
           "Generated {} piece nodes, splitTime: {}, dispatchSplitsTime: {}, 
dispatchNodeTime: {}",
           pieceNodeNum.get(),
           splitTime.get() / 1_000_000L,
-          dispatchNodesTime.get() / 1_000_000L,
-          dispatchNodeTime.get() / 1_000_000L);
+          dispatchNodesTimeNS.get() / 1_000_000L,
+          dispatchNodeTimeNS.get() / 1_000_000L);
       logger.info(
           "Transmission size: {}/{} ({}), compressionTime: {}ms",
           compressedSize.get(),
           rawSize.get(),
           compressedSize.get() * 1.0 / rawSize.get(),
-          compressingTime.get() / 1_000_000L);
+          compressingTimeNs.get() / 1_000_000L);
+      logger.info("Sync TsFile time: {}ms ({})", p1TimeMs, p1ThroughputMBPS());
+      logger.info("Load command execution time: {}ms ({})", p2TimeMS, 
p2ThroughputMBPS());
+    }
+
+    public double p2ThroughputMBPS() {
+      return totalSize * 1.0 / MB / (p2TimeMS / 1000.0);
+    }
+
+    public double p1ThroughputMBPS() {
+      return totalSize * 1.0 / MB / (p1TimeMS / 1000.0);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
index 308d7ea221b..e3b7d7b0615 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
@@ -25,9 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
-import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.consensus.ConfigRegionId;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.partition.DataPartition;
@@ -50,6 +48,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import 
org.apache.iotdb.db.protocol.client.ConfigNodeClient.ConfigNodeClientProvider;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import org.apache.iotdb.db.schemaengine.schemaregion.utils.MetaUtils;
@@ -75,6 +74,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class PartitionCache {
+
   private static final Logger logger = 
LoggerFactory.getLogger(PartitionCache.class);
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
   private static final List<String> ROOT_PATH = Arrays.asList("root", "**");
@@ -107,8 +107,8 @@ public class PartitionCache {
 
   private final ReentrantReadWriteLock regionReplicaSetLock = new 
ReentrantReadWriteLock();
 
-  private final IClientManager<ConfigRegionId, ConfigNodeClient> 
configNodeClientManager =
-      ConfigNodeClientManager.getInstance();
+  private ConfigNodeClientProvider configNodeClientProvider =
+      () -> 
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
   private final CacheMetrics cacheMetrics;
 
   public PartitionCache() {
@@ -120,6 +120,11 @@ public class PartitionCache {
     this.cacheMetrics = new CacheMetrics();
   }
 
+  public PartitionCache(ConfigNodeClientProvider configNodeClientProvider) {
+    this();
+    this.configNodeClientProvider = configNodeClientProvider;
+  }
+
   // region database cache
 
   /**
@@ -191,8 +196,7 @@ public class PartitionCache {
   private void fetchStorageGroupAndUpdateCache(
       StorageGroupCacheResult<?> result, List<String> devicePaths)
       throws ClientManagerException, TException {
-    try (ConfigNodeClient client =
-        configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) 
{
+    try (ConfigNodeClient client = configNodeClientProvider.supply()) {
       storageGroupCacheLock.writeLock().lock();
       result.reset();
       getStorageGroupMap(result, devicePaths, true);
@@ -222,8 +226,7 @@ public class PartitionCache {
   private void createStorageGroupAndUpdateCache(
       StorageGroupCacheResult<?> result, List<String> devicePaths, String 
userName)
       throws ClientManagerException, MetadataException, TException {
-    try (ConfigNodeClient client =
-        configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) 
{
+    try (ConfigNodeClient client = configNodeClientProvider.supply()) {
       storageGroupCacheLock.writeLock().lock();
       // try to check whether database need to be created
       result.reset();
@@ -416,6 +419,7 @@ public class PartitionCache {
   // endregion
 
   // region replicaSet cache
+
   /**
    * get regionReplicaSet from local and confignode
    *
@@ -439,8 +443,7 @@ public class PartitionCache {
         regionReplicaSetLock.writeLock().lock();
         // verify that there are not hit in cache
         if (!groupIdToReplicaSetMap.containsKey(consensusGroupId)) {
-          try (ConfigNodeClient client =
-              
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+          try (ConfigNodeClient client = configNodeClientProvider.supply()) {
             TRegionRouteMapResp resp = client.getLatestRegionRouteMap();
             if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
resp.getStatus().getCode()) {
               updateGroupIdToReplicaSetMap(resp.getTimestamp(), 
resp.getRegionRouteMap());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
index 8536457fbf2..98e50f2720b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
@@ -66,11 +66,11 @@ public abstract class BasicPartitionFetcher implements 
IPartitionFetcher {
 
   protected final PartitionCache partitionCache;
 
-  protected BasicPartitionFetcher(int seriesPartitionSlotNum) {
+  protected BasicPartitionFetcher(int seriesPartitionSlotNum, PartitionCache 
partitionCache) {
     this.partitionExecutor =
         SeriesPartitionExecutor.getSeriesPartitionExecutor(
             config.getSeriesPartitionExecutorClass(), seriesPartitionSlotNum);
-    this.partitionCache = new PartitionCache();
+    this.partitionCache = partitionCache;
   }
 
   protected abstract ConfigNodeClient getClient() throws 
ClientManagerException, TException;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
index f8d218bf71f..b38346c150c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.consensus.ConfigRegionId;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.partition.PartitionCache;
 
 public class ClusterPartitionFetcher extends BasicPartitionFetcher {
   private final IClientManager<ConfigRegionId, ConfigNodeClient> 
configNodeClientManager =
@@ -42,7 +43,7 @@ public class ClusterPartitionFetcher extends 
BasicPartitionFetcher {
   }
 
   private ClusterPartitionFetcher() {
-    super(config.getSeriesPartitionSlotNum());
+    super(config.getSeriesPartitionSlotNum(), new PartitionCache());
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ExternalPartitionFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ExternalPartitionFetcher.java
index ac1624c85c6..e9e7adb53b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ExternalPartitionFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ExternalPartitionFetcher.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.queryengine.plan.analyze.partition;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.partition.PartitionCache;
 
 import org.apache.thrift.TException;
 
@@ -40,7 +41,9 @@ public class ExternalPartitionFetcher extends 
BasicPartitionFetcher {
       List<TEndPoint> externalConfigNodes,
       ThriftClientProperty property,
       int seriesPartitionSlotNum) {
-    super(seriesPartitionSlotNum);
+    super(
+        seriesPartitionSlotNum,
+        new PartitionCache(() -> new ConfigNodeClient(externalConfigNodes, 
property, null)));
     this.externalConfigNodes = externalConfigNodes;
     this.property = property;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
index 0558d375988..78bf3023943 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 
+import java.util.ArrayList;
 import java.util.List;
 
 public class SchemaValidator {
@@ -66,4 +67,26 @@ public class SchemaValidator {
     return schemaFetcher.fetchSchemaListWithAutoCreate(
         devicePaths, measurements, dataTypes, encodings, compressionTypes, 
isAlignedList, context);
   }
+
+  public static ISchemaTree validate(
+      ISchemaFetcher schemaFetcher, ValidatingSchema validatingSchema, 
MPPQueryContext context) {
+    return schemaFetcher.fetchSchemaListWithAutoCreate(
+        validatingSchema.devicePaths,
+        validatingSchema.measurements,
+        validatingSchema.dataTypes,
+        validatingSchema.encodings,
+        validatingSchema.compressionTypes,
+        validatingSchema.isAlignedList,
+        context);
+  }
+
+  public static class ValidatingSchema {
+
+    public List<PartialPath> devicePaths = new ArrayList<>();
+    public List<String[]> measurements = new ArrayList<>();
+    public List<TSDataType[]> dataTypes = new ArrayList<>();
+    public List<TSEncoding[]> encodings = new ArrayList<>();
+    public List<CompressionType[]> compressionTypes = new ArrayList<>();
+    public List<Boolean> isAlignedList = new ArrayList<>();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index 6e9cc5bee33..808b97ab465 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -39,10 +39,14 @@ import java.util.Objects;
 public class LoadTsFileNode extends WritePlanNode {
 
   private final List<TsFileResource> resources;
+  private long totalSize;
 
   public LoadTsFileNode(PlanNodeId id, List<TsFileResource> resources) {
     super(id);
     this.resources = resources;
+    for (TsFileResource resource : resources) {
+      totalSize += resource.getTsFileSize();
+    }
   }
 
   @Override
@@ -119,4 +123,8 @@ public class LoadTsFileNode extends WritePlanNode {
   public TsFileResource lastResource() {
     return resources.get(resources.size() - 1);
   }
+
+  public long getTotalSize() {
+    return totalSize;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
index 2e2db3dd13e..249532770e6 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
@@ -87,6 +87,7 @@ public class LoadTsFileManagerTest extends TestBase {
             false,
             maxSplitSize,
             100,
+            "root",
             "root");
     long start = System.currentTimeMillis();
     splitSender.start();
@@ -156,9 +157,9 @@ public class LoadTsFileManagerTest extends TestBase {
         dataRegionMap.get(req.consensusGroupId), pieceNode, req.uuid);
 
     // forward to other replicas in the group
-    if (req.isRelay) {
-      req.isRelay = false;
-      TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
+    if (req.relayTargets != null) {
+      TRegionReplicaSet regionReplicaSet = req.relayTargets;
+      req.relayTargets = null;
       regionReplicaSet.getDataNodeLocations().stream()
           .parallel()
           .forEach(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
index 3f605a36247..bd903c45548 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
@@ -120,6 +120,7 @@ public class TsFileSplitSenderTest extends TestBase {
             false,
             maxSplitSize,
             100,
+            "root",
             "root");
     long start = System.currentTimeMillis();
     splitSender.start();
@@ -143,16 +144,16 @@ public class TsFileSplitSenderTest extends TestBase {
     long handleStart = System.nanoTime();
     if ((tEndpoint.getPort() - 10000) % 3 == 0
         && random.nextDouble() < packetLossRatio
-        && req.isRelay) {
+        && req.relayTargets != null) {
       throw new TException("Packet lost");
     }
     if ((tEndpoint.getPort() - 10000) % 3 == 1
         && random.nextDouble() < packetLossRatio / 2
-        && req.isRelay) {
+        && req.relayTargets != null) {
       throw new TException("Packet lost");
     }
 
-    if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay && 
stuckDurationMS > 0) {
+    if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.relayTargets != null && 
stuckDurationMS > 0) {
       Pair<Long, Long> nextStuckTime =
           nextStuckTimeMap.computeIfAbsent(
               tEndpoint,
@@ -216,14 +217,14 @@ public class TsFileSplitSenderTest extends TestBase {
             .collect(Collectors.toList()));
 
     if (dummyDelayMS > 0) {
-      if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay) {
+      if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.relayTargets != null) {
         try {
           Thread.sleep(dummyDelayMS);
         } catch (InterruptedException e) {
           throw new RuntimeException(e);
         }
       }
-      if ((tEndpoint.getPort() - 10000) % 3 == 1 && req.isRelay) {
+      if ((tEndpoint.getPort() - 10000) % 3 == 1 && req.relayTargets != null) {
         try {
           Thread.sleep(dummyDelayMS / 2);
         } catch (InterruptedException e) {
@@ -233,10 +234,10 @@ public class TsFileSplitSenderTest extends TestBase {
     }
 
     // forward to other replicas in the group
-    if (req.isRelay) {
+    if (req.relayTargets != null) {
       long relayStart = System.nanoTime();
-      req.isRelay = false;
-      TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
+      TRegionReplicaSet regionReplicaSet = req.relayTargets;
+      req.relayTargets = null;
       regionReplicaSet.getDataNodeLocations().stream()
           .parallel()
           .forEach(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index e5a80f681be..edf464badae 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -57,9 +58,14 @@ public class PipeTaskMeta {
   private final Map<PipeRuntimeException, PipeRuntimeException> 
exceptionMessages =
       new ConcurrentHashMap<>();
 
+  private long createTime;
+  private long lastReportTime;
+
   public PipeTaskMeta(/* @NotNull */ ProgressIndex progressIndex, int 
leaderDataNodeId) {
     this.progressIndex.set(progressIndex);
     this.leaderDataNodeId.set(leaderDataNodeId);
+    this.createTime = System.currentTimeMillis();
+    this.lastReportTime = System.currentTimeMillis();
   }
 
   public ProgressIndex getProgressIndex() {
@@ -67,6 +73,7 @@ public class PipeTaskMeta {
   }
 
   public ProgressIndex updateProgressIndex(ProgressIndex updateIndex) {
+    this.lastReportTime = System.currentTimeMillis();
     return progressIndex.updateAndGet(
         index -> index.updateToMinimumIsAfterProgressIndex(updateIndex));
   }
@@ -176,6 +183,11 @@ public class PipeTaskMeta {
         + leaderDataNodeId
         + ", exceptionMessages='"
         + exceptionMessages
+        + ", lastReportSince "
+        + new Date(createTime)
+        + " ("
+        + (lastReportTime - createTime) / 1000
+        + "s)"
         + "'}";
   }
 }
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
index a84c735e549..c54028d513c 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
@@ -62,11 +62,16 @@ public class TsFileUtils {
 
   public static ByteBuffer uncompressPage(
       PageHeader header, CompressionType type, ByteBuffer buffer) throws 
IOException {
-    if (header.getUncompressedSize() == 0 || type == 
CompressionType.UNCOMPRESSED) {
+    return uncompress(header.getUncompressedSize(), type, buffer);
+  }
+
+  public static ByteBuffer uncompress(int uncompressedSize, CompressionType 
type, ByteBuffer buffer)
+      throws IOException {
+    if (uncompressedSize == 0 || type == CompressionType.UNCOMPRESSED) {
       return buffer;
     } // FIXME if the buffer is not array-implemented.
     IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
-    ByteBuffer uncompressedBuffer = 
ByteBuffer.allocate(header.getUncompressedSize());
+    ByteBuffer uncompressedBuffer = ByteBuffer.allocate(uncompressedSize);
     unCompressor.uncompress(
         buffer.array(), buffer.position(), buffer.remaining(), 
uncompressedBuffer.array(), 0);
     return uncompressedBuffer;
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 20001fe5985..65f8ef4e410 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -308,10 +308,12 @@ struct TTsFilePieceReq{
     1: required binary body
     2: required string uuid
     3: required common.TConsensusGroupId consensusGroupId
-    // if isRelay is true, the receiver should forward the request to other 
replicas in the group
-    4: optional bool isRelay
+    4: optional common.TRegionReplicaSet relayTargets
     5: optional i8 compressionType
     6: optional i32 uncompressedLength
+    7: optional bool needSchemaRegistration
+    8: optional string username
+    9: optional string password
 }
 
 struct TLoadCommandReq{


Reply via email to