This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch insertion-fix-real
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/insertion-fix-real by this 
push:
     new 1479cfc924b Fixed multiple bugs of insertion (#17570)
1479cfc924b is described below

commit 1479cfc924b309c2d1165148b188fc0d330b13ad
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 30 12:26:42 2026 +0800

    Fixed multiple bugs of insertion (#17570)
    
    * insert-fix
    
    * source/sink
    
    * source/sink-2
    
    * sptls
    
    * fix
    
    * sink
    
    * compile
---
 .../pipe/sink/protocol/IoTDBConfigRegionSink.java  |   9 +-
 .../iotdb/db/pipe/agent/task/PipeDataNodeTask.java |  28 +--
 .../task/execution/PipeSinkSubtaskExecutor.java    |   2 +-
 .../agent/task/stage/PipeTaskProcessorStage.java   |  16 +-
 .../pipe/agent/task/stage/PipeTaskSinkStage.java   |  13 +-
 .../pipe/agent/task/stage/PipeTaskSourceStage.java |  19 +-
 .../airgap/IoTDBSchemaRegionAirGapSink.java        |   7 +-
 .../PipeTransferTabletBatchEventHandler.java       |  10 +-
 .../PipeTransferTabletInsertNodeEventHandler.java  |   2 +-
 .../PipeTransferTabletInsertionEventHandler.java   |   7 +-
 .../handler/PipeTransferTabletRawEventHandler.java |   6 +-
 .../handler/PipeTransferTrackableHandler.java      |  24 +-
 .../async/handler/PipeTransferTsFileHandler.java   |  27 +--
 .../realtime/assigner/DisruptorQueue.java          |   4 +-
 .../listener/PipeInsertionDataNodeListener.java    |  38 ++--
 .../planner/plan/node/write/InsertTabletNode.java  | 140 +++++++++++-
 .../db/storageengine/dataregion/DataRegion.java    | 248 +++++++++++++++------
 .../task/stage/SubscriptionTaskSinkStage.java      |  11 +-
 .../storageengine/dataregion/DataRegionTest.java   | 226 +++++++++++++++++++
 .../iotdb/commons/client/ClientPoolFactory.java    |   4 +-
 .../iotdb/commons/concurrent/ThreadName.java       |  12 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  93 ++++----
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |   8 +-
 23 files changed, 692 insertions(+), 262 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
index 5ee983d7945..b5f62ef4ccf 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
@@ -88,7 +88,7 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink {
   protected PipeTransferFilePieceReq getTransferSingleFilePieceReq(
       final String fileName, final long position, final byte[] payLoad) {
     throw new UnsupportedOperationException(
-        "The config region connector does not support transferring single file 
piece req.");
+        "The config region sink does not support transferring single file 
piece req.");
   }
 
   @Override
@@ -105,13 +105,13 @@ public class IoTDBConfigRegionSink extends 
IoTDBSslSyncSink {
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
     throw new UnsupportedOperationException(
-        "IoTDBConfigRegionConnector can't transfer TabletInsertionEvent.");
+        "IoTDBConfigRegionSink can't transfer TabletInsertionEvent.");
   }
 
   @Override
   public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
     throw new UnsupportedOperationException(
-        "IoTDBConfigRegionConnector can't transfer TsFileInsertionEvent.");
+        "IoTDBConfigRegionSink can't transfer TsFileInsertionEvent.");
   }
 
   @Override
@@ -121,8 +121,7 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink 
{
     } else if (event instanceof PipeConfigRegionSnapshotEvent) {
       doTransferWrapper((PipeConfigRegionSnapshotEvent) event);
     } else if (!(event instanceof PipeHeartbeatEvent)) {
-      LOGGER.warn(
-          "IoTDBConfigRegionConnector does not support transferring generic 
event: {}.", event);
+      LOGGER.warn("IoTDBConfigRegionSink does not support transferring generic 
event: {}.", event);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
index d33ec44a86e..0d0b955c210 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
@@ -32,32 +32,32 @@ public class PipeDataNodeTask implements PipeTask {
   private final String pipeName;
   private final int regionId;
 
-  private final PipeTaskStage extractorStage;
+  private final PipeTaskStage sourceStage;
   private final PipeTaskStage processorStage;
-  private final PipeTaskStage connectorStage;
+  private final PipeTaskStage sinkStage;
 
   private volatile boolean isCompleted = false;
 
   public PipeDataNodeTask(
       final String pipeName,
       final int regionId,
-      final PipeTaskStage extractorStage,
+      final PipeTaskStage sourceStage,
       final PipeTaskStage processorStage,
-      final PipeTaskStage connectorStage) {
+      final PipeTaskStage sinkStage) {
     this.pipeName = pipeName;
     this.regionId = regionId;
 
-    this.extractorStage = extractorStage;
+    this.sourceStage = sourceStage;
     this.processorStage = processorStage;
-    this.connectorStage = connectorStage;
+    this.sinkStage = sinkStage;
   }
 
   @Override
   public void create() {
     final long startTime = System.currentTimeMillis();
-    extractorStage.create();
+    sourceStage.create();
     processorStage.create();
-    connectorStage.create();
+    sinkStage.create();
     LOGGER.info(
         "Create pipe DN task {} successfully within {} ms",
         this,
@@ -67,9 +67,9 @@ public class PipeDataNodeTask implements PipeTask {
   @Override
   public void drop() {
     final long startTime = System.currentTimeMillis();
-    extractorStage.drop();
+    sourceStage.drop();
     processorStage.drop();
-    connectorStage.drop();
+    sinkStage.drop();
     LOGGER.info(
         "Drop pipe DN task {} successfully within {} ms",
         this,
@@ -79,9 +79,9 @@ public class PipeDataNodeTask implements PipeTask {
   @Override
   public void start() {
     final long startTime = System.currentTimeMillis();
-    extractorStage.start();
+    sourceStage.start();
     processorStage.start();
-    connectorStage.start();
+    sinkStage.start();
     LOGGER.info(
         "Start pipe DN task {} successfully within {} ms",
         this,
@@ -91,9 +91,9 @@ public class PipeDataNodeTask implements PipeTask {
   @Override
   public void stop() {
     final long startTime = System.currentTimeMillis();
-    extractorStage.stop();
+    sourceStage.stop();
     processorStage.stop();
-    connectorStage.stop();
+    sinkStage.stop();
     LOGGER.info(
         "Stop pipe DN task {} successfully within {} ms",
         this,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
index 9a88ad74d7c..4abc8daed30 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
@@ -31,7 +31,7 @@ public class PipeSinkSubtaskExecutor extends 
PipeSubtaskExecutor {
   public PipeSinkSubtaskExecutor() {
     super(
         PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
-        ThreadName.PIPE_CONNECTOR_EXECUTOR_POOL.getName() + "-" + id.get(),
+        ThreadName.PIPE_SINK_EXECUTOR_POOL.getName() + "-" + id.get(),
         ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName() + "-" + 
id.getAndIncrement(),
         true);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
index ddc194716d2..2809ec9ec93 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
@@ -54,8 +54,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
    * @param creationTime pipe creation time
    * @param pipeProcessorParameters used to create {@link PipeProcessor}
    * @param regionId {@link DataRegion} id
-   * @param pipeExtractorInputEventSupplier used to input {@link Event}s from 
{@link PipeExtractor}
-   * @param pipeConnectorOutputPendingQueue used to output {@link Event}s to 
{@link PipeConnector}
+   * @param pipeSourceInputEventSupplier used to input {@link Event}s from 
{@link PipeExtractor}
+   * @param pipeSinkOutputPendingQueue used to output {@link Event}s to {@link 
PipeConnector}
    * @throws PipeException if failed to {@link 
PipeProcessor#validate(PipeParameterValidator)} or
    *     {@link PipeProcessor#customize(PipeParameters, 
PipeProcessorRuntimeConfiguration)}}
    */
@@ -64,8 +64,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       final long creationTime,
       final PipeParameters pipeProcessorParameters,
       final int regionId,
-      final EventSupplier pipeExtractorInputEventSupplier,
-      final UnboundedBlockingPendingQueue<Event> 
pipeConnectorOutputPendingQueue,
+      final EventSupplier pipeSourceInputEventSupplier,
+      final UnboundedBlockingPendingQueue<Event> pipeSinkOutputPendingQueue,
       final PipeProcessorSubtaskExecutor executor,
       final PipeTaskMeta pipeTaskMeta,
       final boolean forceTabletFormat,
@@ -99,9 +99,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
     // removed, the new subtask will have the same pipeName and regionId as the
     // old one, so we need creationTime to make their hash code different in 
the map.
     final String taskId = pipeName + "_" + regionId + "_" + creationTime;
-    final PipeEventCollector pipeConnectorOutputEventCollector =
+    final PipeEventCollector pipeSinkOutputEventCollector =
         new PipeEventCollector(
-            pipeConnectorOutputPendingQueue,
+            pipeSinkOutputPendingQueue,
             creationTime,
             regionId,
             forceTabletFormat,
@@ -112,9 +112,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
             pipeName,
             creationTime,
             regionId,
-            pipeExtractorInputEventSupplier,
+            pipeSourceInputEventSupplier,
             pipeProcessor,
-            pipeConnectorOutputEventCollector);
+            pipeSinkOutputEventCollector);
 
     this.executor = executor;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
index a22fbb536d7..88eac560cde 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
@@ -38,7 +38,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
   protected final int regionId;
   protected final Supplier<? extends PipeSinkSubtaskExecutor> executor;
 
-  protected String connectorSubtaskId;
+  protected String sinkSubtaskId;
 
   public PipeTaskSinkStage(
       String pipeName,
@@ -56,7 +56,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
   }
 
   protected void registerSubtask() {
-    this.connectorSubtaskId =
+    this.sinkSubtaskId =
         PipeSinkSubtaskManager.instance()
             .register(
                 executor,
@@ -71,21 +71,20 @@ public class PipeTaskSinkStage extends PipeTaskStage {
 
   @Override
   public void startSubtask() throws PipeException {
-    PipeSinkSubtaskManager.instance().start(connectorSubtaskId);
+    PipeSinkSubtaskManager.instance().start(sinkSubtaskId);
   }
 
   @Override
   public void stopSubtask() throws PipeException {
-    PipeSinkSubtaskManager.instance().stop(connectorSubtaskId);
+    PipeSinkSubtaskManager.instance().stop(sinkSubtaskId);
   }
 
   @Override
   public void dropSubtask() throws PipeException {
-    PipeSinkSubtaskManager.instance()
-        .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
+    PipeSinkSubtaskManager.instance().deregister(pipeName, creationTime, 
regionId, sinkSubtaskId);
   }
 
   public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
-    return 
PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(connectorSubtaskId);
+    return 
PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(sinkSubtaskId);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
index 6acc0fc3d4a..240b5499e92 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
@@ -44,32 +44,31 @@ public class PipeTaskSourceStage extends PipeTaskStage {
   public PipeTaskSourceStage(
       String pipeName,
       long creationTime,
-      PipeParameters extractorParameters,
+      PipeParameters sourceParameters,
       int regionId,
       PipeTaskMeta pipeTaskMeta) {
     pipeExtractor =
         StorageEngine.getInstance().getAllDataRegionIds().contains(new 
DataRegionId(regionId))
-            ? 
PipeDataNodeAgent.plugin().dataRegion().reflectSource(extractorParameters)
-            : 
PipeDataNodeAgent.plugin().schemaRegion().reflectSource(extractorParameters);
+            ? 
PipeDataNodeAgent.plugin().dataRegion().reflectSource(sourceParameters)
+            : 
PipeDataNodeAgent.plugin().schemaRegion().reflectSource(sourceParameters);
 
-    // Validate and customize should be called before createSubtask. this 
allows extractor exposing
+    // Validate and customize should be called before createSubtask. this 
allows source exposing
     // exceptions in advance.
     try {
-      // 1. Validate extractor parameters
-      pipeExtractor.validate(new PipeParameterValidator(extractorParameters));
+      // 1. Validate source parameters
+      pipeExtractor.validate(new PipeParameterValidator(sourceParameters));
 
-      // 2. Customize extractor
+      // 2. Customize source
       final PipeTaskRuntimeConfiguration runtimeConfiguration =
           new PipeTaskRuntimeConfiguration(
               new PipeTaskSourceRuntimeEnvironment(pipeName, creationTime, 
regionId, pipeTaskMeta));
-      pipeExtractor.customize(extractorParameters, runtimeConfiguration);
+      pipeExtractor.customize(sourceParameters, runtimeConfiguration);
     } catch (Exception e) {
       try {
         pipeExtractor.close();
       } catch (Exception closeException) {
         LOGGER.warn(
-            "Failed to close extractor after failed to initialize extractor. "
-                + "Ignore this exception.",
+            "Failed to close source after failed to initialize source. " + 
"Ignore this exception.",
             closeException);
       }
       throw new PipeException(e.getMessage(), e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
index 8280709446a..6b7787ba162 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
@@ -47,13 +47,13 @@ public class IoTDBSchemaRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
     throw new UnsupportedOperationException(
-        "IoTDBSchemaRegionAirGapConnector can't transfer 
TabletInsertionEvent.");
+        "IoTDBSchemaRegionAirGapSink can't transfer TabletInsertionEvent.");
   }
 
   @Override
   public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
     throw new UnsupportedOperationException(
-        "IoTDBSchemaRegionAirGapConnector can't transfer 
TsFileInsertionEvent.");
+        "IoTDBSchemaRegionAirGapSink can't transfer TsFileInsertionEvent.");
   }
 
   @Override
@@ -68,8 +68,7 @@ public class IoTDBSchemaRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
         doTransferWrapper(socket, (PipeSchemaRegionSnapshotEvent) event);
       } else if (!(event instanceof PipeHeartbeatEvent)) {
         LOGGER.warn(
-            "IoTDBSchemaRegionAirGapConnector does not support transferring 
generic event: {}.",
-            event);
+            "IoTDBSchemaRegionAirGapSink does not support transferring generic 
event: {}.", event);
       }
     } catch (final IOException e) {
       isSocketAlive.set(socketIndex, false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 110d3cb6450..8bcb9d47009 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -69,7 +69,7 @@ public class PipeTransferTabletBatchEventHandler extends 
PipeTransferTrackableHa
 
   public void transfer(final AsyncPipeDataTransferServiceClient client) throws 
TException {
     for (final Map.Entry<Pair<String, Long>, Long> entry : 
pipeName2BytesAccumulated.entrySet()) {
-      connector.rateLimitIfNeeded(
+      sink.rateLimitIfNeeded(
           entry.getKey().getLeft(),
           entry.getKey().getRight(),
           client.getEndPoint(),
@@ -92,13 +92,11 @@ public class PipeTransferTabletBatchEventHandler extends 
PipeTransferTrackableHa
       // Only handle the failed statuses to avoid string format performance 
overhead
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
           && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-        connector
-            .statusHandler()
-            .handle(status, response.getStatus().getMessage(), 
events.toString());
+        sink.statusHandler().handle(status, response.getStatus().getMessage(), 
events.toString());
       }
       for (final Pair<String, TEndPoint> redirectPair :
           LeaderCacheUtils.parseRecommendedRedirections(status)) {
-        connector.updateLeaderCache(redirectPair.getLeft(), 
redirectPair.getRight());
+        sink.updateLeaderCache(redirectPair.getLeft(), 
redirectPair.getRight());
       }
 
       events.forEach(
@@ -123,7 +121,7 @@ public class PipeTransferTabletBatchEventHandler extends 
PipeTransferTrackableHa
           events.size(),
           
events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet()));
     } finally {
-      connector.addFailureEventsToRetryQueue(events, exception);
+      sink.addFailureEventsToRetryQueue(events, exception);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 70ba7f4cfc5..912a1e724f7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -46,7 +46,7 @@ public class PipeTransferTabletInsertNodeEventHandler
 
   @Override
   protected void updateLeaderCache(final TSStatus status) {
-    connector.updateLeaderCache(
+    sink.updateLeaderCache(
         ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(), 
status.getRedirectNode());
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 66a1f4a013b..a8f1136b897 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -54,7 +54,7 @@ public abstract class PipeTransferTabletInsertionEventHandler 
extends PipeTransf
 
   public void transfer(final AsyncPipeDataTransferServiceClient client) throws 
TException {
     if (event instanceof EnrichedEvent) {
-      connector.rateLimitIfNeeded(
+      sink.rateLimitIfNeeded(
           ((EnrichedEvent) event).getPipeName(),
           ((EnrichedEvent) event).getCreationTime(),
           client.getEndPoint(),
@@ -77,8 +77,7 @@ public abstract class PipeTransferTabletInsertionEventHandler 
extends PipeTransf
       // Only handle the failed statuses to avoid string format performance 
overhead
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
           && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-        connector
-            .statusHandler()
+        sink.statusHandler()
             .handle(response.getStatus(), response.getStatus().getMessage(), 
event.toString());
       }
       if (event instanceof EnrichedEvent) {
@@ -109,7 +108,7 @@ public abstract class 
PipeTransferTabletInsertionEventHandler extends PipeTransf
           event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getCommitterKey() : null,
           event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getCommitIds() : null);
     } finally {
-      connector.addFailureEventToRetryQueue(event, exception);
+      sink.addFailureEventToRetryQueue(event, exception);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
index ff1daa05c28..b64e446827a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
@@ -32,8 +32,8 @@ public class PipeTransferTabletRawEventHandler extends 
PipeTransferTabletInserti
   public PipeTransferTabletRawEventHandler(
       final PipeRawTabletInsertionEvent event,
       final TPipeTransferReq req,
-      final IoTDBDataRegionAsyncSink connector) {
-    super(event, req, connector);
+      final IoTDBDataRegionAsyncSink sink) {
+    super(event, req, sink);
   }
 
   @Override
@@ -45,7 +45,7 @@ public class PipeTransferTabletRawEventHandler extends 
PipeTransferTabletInserti
 
   @Override
   protected void updateLeaderCache(final TSStatus status) {
-    connector.updateLeaderCache(
+    sink.updateLeaderCache(
         ((PipeRawTabletInsertionEvent) event).getDeviceId(), 
status.getRedirectNode());
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 21f7c144bed..a8b4a3b7a79 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -36,18 +36,18 @@ public abstract class PipeTransferTrackableHandler
     implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
 
-  protected final IoTDBDataRegionAsyncSink connector;
+  protected final IoTDBDataRegionAsyncSink sink;
   protected volatile AsyncPipeDataTransferServiceClient client;
 
-  public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink 
connector) {
-    this.connector = connector;
+  public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink sink) {
+    this.sink = sink;
   }
 
   @Override
   public void onComplete(final TPipeTransferResp response) {
-    if (connector.isClosed()) {
+    if (sink.isClosed()) {
       clearEventsReferenceCount();
-      connector.eliminateHandler(this, true);
+      sink.eliminateHandler(this, true);
       return;
     }
 
@@ -56,7 +56,7 @@ public abstract class PipeTransferTrackableHandler
       // completed
       // NOTE: We should not clear the reference count of events, as this 
would cause the
       // 
`org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic.IoTDBPipeDataSinkIT#testSinkTsFileFormat3`
 test to fail.
-      connector.eliminateHandler(this, false);
+      sink.eliminateHandler(this, false);
     }
   }
 
@@ -67,14 +67,14 @@ public abstract class PipeTransferTrackableHandler
       client.setPrintLogWhenEncounterException(false);
     }
 
-    if (connector.isClosed()) {
+    if (sink.isClosed()) {
       clearEventsReferenceCount();
-      connector.eliminateHandler(this, true);
+      sink.eliminateHandler(this, true);
       return;
     }
 
     onErrorInternal(exception);
-    connector.eliminateHandler(this, false);
+    sink.eliminateHandler(this, false);
   }
 
   /**
@@ -93,10 +93,10 @@ public abstract class PipeTransferTrackableHandler
       this.client = client;
     }
     // track handler before checking if connector is closed
-    connector.trackHandler(this);
-    if (connector.isClosed()) {
+    sink.trackHandler(this);
+    if (sink.isClosed()) {
       clearEventsReferenceCount();
-      connector.eliminateHandler(this, true);
+      sink.eliminateHandler(this, true);
       client.setShouldReturnSelf(true);
       client.returnSelf(
           (e) -> {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index a9426ed7b8b..6742199ac10 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -159,7 +159,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     if (client == null) {
       LOGGER.warn(
           "Client has been returned to the pool. Current handler status is {}. 
Will not transfer {}.",
-          connector.isClosed() ? "CLOSED" : "NOT CLOSED",
+          sink.isClosed() ? "CLOSED" : "NOT CLOSED",
           tsFile);
       return;
     }
@@ -168,7 +168,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     client.setTimeoutDynamically(clientManager.getConnectionTimeout());
 
     PipeResourceMetrics.getInstance().recordDiskIO(readFileBufferSize);
-    if (connector.isEnableSendTsFileLimit()) {
+    if (sink.isEnableSendTsFileLimit()) {
       TsFileSendRateLimiter.getInstance().acquire(readFileBufferSize);
     }
     final int readLength = reader.read(readBuffer);
@@ -192,11 +192,11 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
                 ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
                     modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length())
                 : 
PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length());
-        final TPipeTransferReq req = 
connector.compressIfNeeded(uncompressedReq);
+        final TPipeTransferReq req = sink.compressIfNeeded(uncompressedReq);
 
         pipeName2WeightMap.forEach(
             (pipePair, weight) ->
-                connector.rateLimitIfNeeded(
+                sink.rateLimitIfNeeded(
                     pipePair.getLeft(),
                     pipePair.getRight(),
                     client.getEndPoint(),
@@ -219,11 +219,11 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
                 currentFile.getName(), position, payload)
             : PipeTransferTsFilePieceReq.toTPipeTransferReq(
                 currentFile.getName(), position, payload);
-    final TPipeTransferReq req = connector.compressIfNeeded(uncompressedReq);
+    final TPipeTransferReq req = sink.compressIfNeeded(uncompressedReq);
 
     pipeName2WeightMap.forEach(
         (pipePair, weight) ->
-            connector.rateLimitIfNeeded(
+            sink.rateLimitIfNeeded(
                 pipePair.getLeft(),
                 pipePair.getRight(),
                 client.getEndPoint(),
@@ -241,7 +241,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     try {
       super.onComplete(response);
     } finally {
-      if (connector.isClosed()) {
+      if (sink.isClosed()) {
         returnClientIfNecessary();
       }
     }
@@ -255,8 +255,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
         // Only handle the failed statuses to avoid string format performance 
overhead
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-          connector
-              .statusHandler()
+          sink.statusHandler()
               .handle(
                   status,
                   String.format(
@@ -330,9 +329,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
         // Only handle the failed statuses to avoid string format performance 
overhead
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-          connector
-              .statusHandler()
-              .handle(status, response.getStatus().getMessage(), 
tsFile.getName());
+          sink.statusHandler().handle(status, 
response.getStatus().getMessage(), tsFile.getName());
         }
       }
 
@@ -404,7 +401,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
         returnClientIfNecessary();
       } finally {
         if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
-          connector.addFailureEventsToRetryQueue(events, exception);
+          sink.addFailureEventsToRetryQueue(events, exception);
         }
       }
     }
@@ -415,7 +412,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
       return;
     }
 
-    if (connector.isClosed()) {
+    if (sink.isClosed()) {
       closeClient();
     }
 
@@ -439,7 +436,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     if (client == null) {
       LOGGER.warn(
           "Client has been returned to the pool. Current handler status is {}. 
Will not transfer {}.",
-          connector.isClosed() ? "CLOSED" : "NOT CLOSED",
+          sink.isClosed() ? "CLOSED" : "NOT CLOSED",
           tsFile);
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index 52ac137ae4e..2019eba8560 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -35,13 +35,13 @@ import org.slf4j.LoggerFactory;
 
 import java.util.function.Consumer;
 
-import static 
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISRUPTOR;
+import static 
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_SOURCE_DISRUPTOR;
 
 public class DisruptorQueue {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DisruptorQueue.class);
   private static final IoTDBDaemonThreadFactory THREAD_FACTORY =
-      new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName());
+      new IoTDBDaemonThreadFactory(PIPE_SOURCE_DISRUPTOR.getName());
 
   private final PipeMemoryBlock allocatedMemoryBlock;
   private final Disruptor<EventContainer> disruptor;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index aaa98220178..882d4aff0d8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -41,34 +41,34 @@ import java.util.concurrent.atomic.AtomicInteger;
  *
  * <p>All events extracted by this listener will be first published to 
different
  * PipeEventDataRegionAssigners (identified by data region id), and then 
PipeEventDataRegionAssigner
- * will filter events and assign them to different 
PipeRealtimeEventDataRegionExtractors.
+ * will filter events and assign them to different 
PipeRealtimeEventDataRegionSources.
  */
 public class PipeInsertionDataNodeListener {
 
   private final ConcurrentMap<String, PipeDataRegionAssigner> 
dataRegionId2Assigner =
       new ConcurrentHashMap<>();
 
-  private final AtomicInteger listenToTsFileExtractorCount = new 
AtomicInteger(0);
-  private final AtomicInteger listenToInsertNodeExtractorCount = new 
AtomicInteger(0);
+  private final AtomicInteger listenToTsFileSourceCount = new AtomicInteger(0);
+  private final AtomicInteger listenToInsertNodeSourceCount = new 
AtomicInteger(0);
 
   //////////////////////////// start & stop ////////////////////////////
 
   public synchronized void startListenAndAssign(
-      String dataRegionId, PipeRealtimeDataRegionSource extractor) {
+      final String dataRegionId, final PipeRealtimeDataRegionSource source) {
     dataRegionId2Assigner
         .computeIfAbsent(dataRegionId, o -> new 
PipeDataRegionAssigner(dataRegionId))
-        .startAssignTo(extractor);
+        .startAssignTo(source);
 
-    if (extractor.isNeedListenToTsFile()) {
-      listenToTsFileExtractorCount.incrementAndGet();
+    if (source.isNeedListenToTsFile()) {
+      listenToTsFileSourceCount.incrementAndGet();
     }
-    if (extractor.isNeedListenToInsertNode()) {
-      listenToInsertNodeExtractorCount.incrementAndGet();
+    if (source.isNeedListenToInsertNode()) {
+      listenToInsertNodeSourceCount.incrementAndGet();
     }
   }
 
   public synchronized void stopListenAndAssign(
-      final String dataRegionId, final PipeRealtimeDataRegionSource extractor) 
{
+      final String dataRegionId, final PipeRealtimeDataRegionSource source) {
     PipeDataRegionAssigner assignerToClose = null;
 
     synchronized (this) {
@@ -77,13 +77,13 @@ public class PipeInsertionDataNodeListener {
         return;
       }
 
-      assigner.stopAssignTo(extractor);
+      assigner.stopAssignTo(source);
 
-      if (extractor.isNeedListenToTsFile()) {
-        listenToTsFileExtractorCount.decrementAndGet();
+      if (source.isNeedListenToTsFile()) {
+        listenToTsFileSourceCount.decrementAndGet();
       }
-      if (extractor.isNeedListenToInsertNode()) {
-        listenToInsertNodeExtractorCount.decrementAndGet();
+      if (source.isNeedListenToInsertNode()) {
+        listenToInsertNodeSourceCount.decrementAndGet();
       }
 
       if (assigner.notMoreSourceNeededToBeAssigned()) {
@@ -104,8 +104,8 @@ public class PipeInsertionDataNodeListener {
 
   public void listenToTsFile(
       final String dataRegionId, final TsFileResource tsFileResource, final 
boolean isLoaded) {
-    // We don't judge whether listenToTsFileExtractorCount.get() == 0 here on 
purpose
-    // because extractors may use tsfile events when some exceptions occur in 
the
+    // We don't judge whether listenToTsFileSourceCount.get() == 0 here on 
purpose
+    // because sources may use tsfile events when some exceptions occur in the
     // insert nodes listening process.
 
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
@@ -120,8 +120,8 @@ public class PipeInsertionDataNodeListener {
   }
 
   public void listenToInsertNode(
-      String dataRegionId, InsertNode insertNode, TsFileResource 
tsFileResource) {
-    if (listenToInsertNodeExtractorCount.get() == 0) {
+      final String dataRegionId, final InsertNode insertNode, final 
TsFileResource tsFileResource) {
+    if (listenToInsertNodeSourceCount.get() == 0) {
       return;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index a366aed71b3..1deb5515eb1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -1076,68 +1076,184 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
     return visitor.visitInsertTablet(this, context);
   }
 
-  public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
+  public TimeValuePair composeLastTimeValuePair(
+      int measurementIndex, int startOffset, int endOffset) {
     if (measurementIndex >= columns.length || 
Objects.isNull(dataTypes[measurementIndex])) {
       return null;
     }
 
     // get non-null value
-    int lastIdx = rowCount - 1;
+    int lastIdx = Math.min(endOffset - 1, rowCount - 1);
     if (bitMaps != null && bitMaps[measurementIndex] != null) {
       BitMap bitMap = bitMaps[measurementIndex];
-      while (lastIdx >= 0) {
+      while (lastIdx >= startOffset) {
         if (!bitMap.isMarked(lastIdx)) {
           break;
         }
         lastIdx--;
       }
     }
-    if (lastIdx < 0) {
+    if (lastIdx < startOffset) {
+      return null;
+    }
+    return composeTimeValuePair(measurementIndex, lastIdx);
+  }
+
+  public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
+    return composeLastTimeValuePair(measurementIndex, 0, rowCount);
+  }
+
+  protected TimeValuePair composeLastTimeValuePair(
+      final int measurementIndex,
+      final TSStatus[] results,
+      final int startOffset,
+      final int endOffset) {
+    if (results == null) {
+      return composeLastTimeValuePair(measurementIndex, startOffset, 
endOffset);
+    }
+    if (measurementIndex >= columns.length || 
Objects.isNull(dataTypes[measurementIndex])) {
       return null;
     }
 
+    final BitMap bitMap = bitMaps == null ? null : bitMaps[measurementIndex];
+    int lastIdx = Math.min(endOffset - 1, rowCount - 1);
+    while (lastIdx >= startOffset) {
+      if (results[lastIdx] != null
+          && results[lastIdx].getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        lastIdx--;
+        continue;
+      }
+      if (bitMap != null && bitMap.isMarked(lastIdx)) {
+        lastIdx--;
+        continue;
+      }
+      break;
+    }
+    return lastIdx < startOffset ? null : 
composeTimeValuePair(measurementIndex, lastIdx);
+  }
+
+  private TimeValuePair composeTimeValuePair(final int measurementIndex, final 
int rowIndex) {
     TsPrimitiveType value;
     switch (dataTypes[measurementIndex]) {
       case INT32:
       case DATE:
         int[] intValues = (int[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsInt(intValues[lastIdx]);
+        value = new TsPrimitiveType.TsInt(intValues[rowIndex]);
         break;
       case INT64:
       case TIMESTAMP:
         long[] longValues = (long[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsLong(longValues[lastIdx]);
+        value = new TsPrimitiveType.TsLong(longValues[rowIndex]);
         break;
       case FLOAT:
         float[] floatValues = (float[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsFloat(floatValues[lastIdx]);
+        value = new TsPrimitiveType.TsFloat(floatValues[rowIndex]);
         break;
       case DOUBLE:
         double[] doubleValues = (double[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsDouble(doubleValues[lastIdx]);
+        value = new TsPrimitiveType.TsDouble(doubleValues[rowIndex]);
         break;
       case BOOLEAN:
         boolean[] boolValues = (boolean[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsBoolean(boolValues[lastIdx]);
+        value = new TsPrimitiveType.TsBoolean(boolValues[rowIndex]);
         break;
       case TEXT:
       case BLOB:
       case STRING:
         Binary[] binaryValues = (Binary[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]);
+        value = new TsPrimitiveType.TsBinary(binaryValues[rowIndex]);
         break;
       default:
         throw new UnSupportedDataTypeException(
             String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
     }
-    return new TimeValuePair(times[lastIdx], value);
+    return new TimeValuePair(times[rowIndex], value);
+  }
+
+  public IDeviceID getDeviceID(int rowIdx) {
+    if (deviceID != null) {
+      return deviceID;
+    }
+    deviceID = DeviceIDFactory.getInstance().getDeviceID(targetPath);
+    return deviceID;
+  }
+
+  protected static class PartitionSplitInfo {
+
+    // for each List in split, they are range1.start, range1.end, 
range2.start, range2.end, ...
+    List<Integer> ranges = new ArrayList<>();
+    List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
+    List<TRegionReplicaSet> replicaSets;
+  }
+
+  /**
+   * Split the tablet of the given range according to Table deviceID.
+   *
+   * @param start inclusive
+   * @param end exclusive
+   * @return each the number in the pair is the end offset of a device
+   */
+  public List<Pair<IDeviceID, Integer>> splitByDevice(int start, int end) {
+    return Collections.singletonList(new Pair<>(getDeviceID(), end));
+  }
+
+  /**
+   * @param results insertion result of each row
+   * @param ttl the ttl
+   * @return the position of the first alive row
+   * @throws OutOfTTLException if all rows have expired the TTL
+   */
+  public int checkTTL(TSStatus[] results, long ttl) throws OutOfTTLException {
+    return checkTTLInternal(results, ttl, true);
+  }
+
+  protected int checkTTLInternal(TSStatus[] results, long ttl, boolean 
breakOnFirstAlive)
+      throws OutOfTTLException {
+
+    /*
+     * assume that batch has been sorted by client
+     */
+    int loc = 0;
+    int firstAliveLoc = -1;
+    while (loc < getRowCount()) {
+      long currTime = getTimes()[loc];
+      // skip points that do not satisfy TTL
+      if (!isAlive(currTime, ttl)) {
+        results[loc] =
+            RpcUtils.getStatus(
+                TSStatusCode.OUT_OF_TTL,
+                String.format(
+                    "Insertion time [%s] is less than ttl time bound [%s]",
+                    DateTimeUtils.convertLongToDate(currTime),
+                    
DateTimeUtils.convertLongToDate(CommonDateTimeUtils.currentTime() - ttl)));
+      } else {
+        if (firstAliveLoc == -1) {
+          firstAliveLoc = loc;
+        }
+        if (breakOnFirstAlive) {
+          break;
+        }
+      }
+      loc++;
+    }
+
+    if (firstAliveLoc == -1) {
+      // no alive data
+      throw new OutOfTTLException(
+          getTimes()[getTimes().length - 1], 
(CommonDateTimeUtils.currentTime() - ttl));
+    }
+    return firstAliveLoc;
   }
 
   public void updateLastCache(final String databaseName) {
+    updateLastCache(databaseName, null);
+  }
+
+  public void updateLastCache(final String databaseName, final TSStatus[] 
results) {
     final String[] rawMeasurements = getRawMeasurements();
     final TimeValuePair[] timeValuePairs = new 
TimeValuePair[rawMeasurements.length];
     for (int i = 0; i < rawMeasurements.length; i++) {
-      timeValuePairs[i] = composeLastTimeValuePair(i);
+      timeValuePairs[i] = composeLastTimeValuePair(i, results, 0, rowCount);
     }
     DataNodeSchemaCache.getInstance()
         .updateLastCacheIfExists(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 3cad57962e5..791d01f9709 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1193,7 +1193,7 @@ public class DataRegion implements IDataRegionForQuery {
         if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
           // disable updating last cache on follower
           startTime = System.nanoTime();
-          tryToUpdateInsertTabletLastCache(insertTabletNode);
+          tryToUpdateInsertTabletLastCache(insertTabletNode, results);
           PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(
               System.nanoTime() - startTime);
         }
@@ -1261,13 +1261,13 @@ public class DataRegion implements IDataRegionForQuery {
       return true;
     }
 
-    TsFileProcessor tsFileProcessor = 
getOrCreateTsFileProcessor(timePartitionId, sequence);
-    if (tsFileProcessor == null) {
+    final TsFileProcessor tsFileProcessor;
+    try {
+      tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
+    } catch (WriteProcessException e) {
+      final TSStatus failureStatus = RpcUtils.getStatus(e.getErrorCode(), 
e.getMessage());
       for (int i = start; i < end; i++) {
-        results[i] =
-            RpcUtils.getStatus(
-                TSStatusCode.INTERNAL_SERVER_ERROR,
-                "can not create TsFileProcessor, timePartitionId: " + 
timePartitionId);
+        results[i] = failureStatus;
       }
       return false;
     }
@@ -1289,8 +1289,9 @@ public class DataRegion implements IDataRegionForQuery {
     return true;
   }
 
-  private void tryToUpdateInsertTabletLastCache(InsertTabletNode node) {
-    node.updateLastCache(getDatabaseName());
+  private void tryToUpdateInsertTabletLastCache(
+      final InsertTabletNode node, final TSStatus[] results) {
+    node.updateLastCache(getDatabaseName(), results);
   }
 
   private TsFileProcessor insertToTsFileProcessor(
@@ -1300,9 +1301,6 @@ public class DataRegion implements IDataRegionForQuery {
       return null;
     }
     TsFileProcessor tsFileProcessor = 
getOrCreateTsFileProcessor(timePartitionId, sequence);
-    if (tsFileProcessor == null) {
-      return null;
-    }
     long[] costsForMetrics = new long[4];
     tsFileProcessor.insert(insertRowNode, costsForMetrics);
     
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
@@ -1325,9 +1323,11 @@ public class DataRegion implements IDataRegionForQuery {
       if (insertRowNode.allMeasurementFailed()) {
         continue;
       }
-      TsFileProcessor tsFileProcessor =
-          getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]);
-      if (tsFileProcessor == null) {
+      final TsFileProcessor tsFileProcessor;
+      try {
+        tsFileProcessor = getOrCreateTsFileProcessor(timePartitionIds[i], 
areSequence[i]);
+      } catch (WriteProcessException e) {
+        insertRowsNode.getResults().put(i, 
RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
         continue;
       }
       int finalI = i;
@@ -1335,43 +1335,28 @@ public class DataRegion implements IDataRegionForQuery {
           tsFileProcessor,
           (k, v) -> {
             if (v == null) {
-              v = new InsertRowsNode(insertRowsNode.getPlanNodeId());
-              v.setSearchIndex(insertRowNode.getSearchIndex());
-              v.setAligned(insertRowNode.isAligned());
-              if (insertRowNode.isGeneratedByPipe()) {
-                v.markAsGeneratedByPipe();
-              }
-              if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
-                v.markAsGeneratedByRemoteConsensusLeader();
-              }
+              v = createGroupedInsertRowsNode(insertRowsNode, insertRowNode);
             }
-            if (v.isAligned() != insertRowNode.isAligned()) {
-              v.setMixingAlignment(true);
-            }
-            v.addOneInsertRowNode(insertRowNode, finalI);
-            v.updateProgressIndex(insertRowNode.getProgressIndex());
+            appendInsertRowNode(v, insertRowNode, finalI);
             return v;
           });
     }
 
     List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
     for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
-      TsFileProcessor tsFileProcessor = entry.getKey();
       InsertRowsNode subInsertRowsNode = entry.getValue();
       try {
-        tsFileProcessor.insert(subInsertRowsNode, costsForMetrics);
+        List<TsFileProcessor> insertedProcessors =
+            insertRowsWithTypeConsistencyCheck(entry.getKey(), 
subInsertRowsNode, costsForMetrics);
+        
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
+        for (TsFileProcessor tsFileProcessor : insertedProcessors) {
+          // check memtable size and may asyncTryToFlush the work memtable
+          if (tsFileProcessor.shouldFlush()) {
+            fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+          }
+        }
       } catch (WriteProcessException e) {
-        insertRowsNode
-            .getResults()
-            .put(
-                subInsertRowsNode.getInsertRowNodeIndexList().get(0),
-                RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
-      }
-      
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
-
-      // check memtable size and may asyncTryToFlush the work memtable
-      if (entry.getKey().shouldFlush()) {
-        fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+        recordInsertRowsFailure(insertRowsNode, subInsertRowsNode, e);
       }
     }
 
@@ -1382,6 +1367,127 @@ public class DataRegion implements IDataRegionForQuery {
     return executedInsertRowNodeList;
   }
 
+  private List<TsFileProcessor> insertRowsWithTypeConsistencyCheck(
+      TsFileProcessor tsFileProcessor, InsertRowsNode subInsertRowsNode, 
long[] costsForMetrics)
+      throws WriteProcessException {
+    try {
+      tsFileProcessor.insertRows(subInsertRowsNode, costsForMetrics);
+      return Collections.singletonList(tsFileProcessor);
+    } catch (DataTypeInconsistentException e) {
+      InsertRowNode firstRow = subInsertRowsNode.getInsertRowNodeList().get(0);
+      long timePartitionId = 
TimePartitionUtils.getTimePartitionId(firstRow.getTime());
+      // flush both MemTables so that the new type can be inserted into a new 
MemTable
+      flushWorkingProcessorsForTimePartition(timePartitionId);
+      return retryInsertRowsAfterFlush(subInsertRowsNode, timePartitionId, 
costsForMetrics);
+    }
+  }
+
+  private InsertRowsNode createGroupedInsertRowsNode(
+      final InsertRowsNode sourceInsertRowsNode, final InsertRowNode 
firstInsertRowNode) {
+    final InsertRowsNode groupedInsertRowsNode = 
sourceInsertRowsNode.emptyClone();
+    initializeGroupedInsertRowsNode(groupedInsertRowsNode, firstInsertRowNode);
+    return groupedInsertRowsNode;
+  }
+
+  private InsertRowsNode createGroupedInsertRowsNode(
+      final InsertRowsOfOneDeviceNode sourceInsertRowsNode,
+      final InsertRowNode firstInsertRowNode) {
+    final InsertRowsNode groupedInsertRowsNode =
+        new InsertRowsNode(sourceInsertRowsNode.getPlanNodeId());
+    initializeGroupedInsertRowsNode(groupedInsertRowsNode, firstInsertRowNode);
+    return groupedInsertRowsNode;
+  }
+
+  private void initializeGroupedInsertRowsNode(
+      final InsertRowsNode groupedInsertRowsNode, final InsertRowNode 
firstInsertRowNode) {
+    groupedInsertRowsNode.setSearchIndex(firstInsertRowNode.getSearchIndex());
+    groupedInsertRowsNode.setAligned(firstInsertRowNode.isAligned());
+    if (firstInsertRowNode.isGeneratedByPipe()) {
+      groupedInsertRowsNode.markAsGeneratedByPipe();
+    }
+    if (firstInsertRowNode.isGeneratedByRemoteConsensusLeader()) {
+      groupedInsertRowsNode.markAsGeneratedByRemoteConsensusLeader();
+    }
+  }
+
+  private void appendInsertRowNode(
+      final InsertRowsNode groupedInsertRowsNode,
+      final InsertRowNode insertRowNode,
+      final int insertRowNodeIndex) {
+    if (groupedInsertRowsNode.isAligned() != insertRowNode.isAligned()) {
+      groupedInsertRowsNode.setMixingAlignment(true);
+    }
+    groupedInsertRowsNode.addOneInsertRowNode(insertRowNode, 
insertRowNodeIndex);
+    
groupedInsertRowsNode.updateProgressIndex(insertRowNode.getProgressIndex());
+  }
+
+  private void flushWorkingProcessorsForTimePartition(final long 
timePartitionId) {
+    TsFileProcessor workSequenceProcessor = 
workSequenceTsFileProcessors.get(timePartitionId);
+    if (workSequenceProcessor != null) {
+      fileFlushPolicy.apply(this, workSequenceProcessor, 
workSequenceProcessor.isSequence());
+    }
+    TsFileProcessor workUnsequenceProcessor = 
workUnsequenceTsFileProcessors.get(timePartitionId);
+    if (workUnsequenceProcessor != null) {
+      fileFlushPolicy.apply(this, workUnsequenceProcessor, 
workUnsequenceProcessor.isSequence());
+    }
+  }
+
+  private List<TsFileProcessor> retryInsertRowsAfterFlush(
+      final InsertRowsNode subInsertRowsNode,
+      final long timePartitionId,
+      final long[] costsForMetrics)
+      throws WriteProcessException {
+    final Map<TsFileProcessor, InsertRowsNode> retriedProcessorMap = new 
HashMap<>();
+    for (int i = 0; i < subInsertRowsNode.getInsertRowNodeList().size(); i++) {
+      final InsertRowNode insertRowNode = 
subInsertRowsNode.getInsertRowNodeList().get(i);
+      final boolean isSequence =
+          config.isEnableSeparateData()
+              && insertRowNode.getTime()
+                  > lastFlushTimeMap.getFlushedTime(timePartitionId, 
insertRowNode.getDeviceID());
+      final TsFileProcessor retriedProcessor =
+          getOrCreateTsFileProcessor(timePartitionId, isSequence);
+      final int insertRowNodeIndex = 
subInsertRowsNode.getInsertRowNodeIndexList().get(i);
+      retriedProcessorMap.compute(
+          retriedProcessor,
+          (k, v) -> {
+            if (v == null) {
+              v = createGroupedInsertRowsNode(subInsertRowsNode, 
insertRowNode);
+            }
+            appendInsertRowNode(v, insertRowNode, insertRowNodeIndex);
+            return v;
+          });
+    }
+
+    final List<TsFileProcessor> insertedProcessors = new 
ArrayList<>(retriedProcessorMap.size());
+    for (Entry<TsFileProcessor, InsertRowsNode> retriedEntry : 
retriedProcessorMap.entrySet()) {
+      final TsFileProcessor retriedProcessor = retriedEntry.getKey();
+      retriedProcessor.insertRows(retriedEntry.getValue(), costsForMetrics);
+      insertedProcessors.add(retriedProcessor);
+    }
+    return insertedProcessors;
+  }
+
+  private void recordInsertRowsFailure(
+      final InsertRowsNode sourceInsertRowsNode,
+      final InsertRowsNode failedInsertRowsNode,
+      final WriteProcessException exception) {
+    final TSStatus failureStatus =
+        RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage());
+    for (Integer failedIndex : 
failedInsertRowsNode.getInsertRowNodeIndexList()) {
+      sourceInsertRowsNode.getResults().put(failedIndex, failureStatus);
+    }
+  }
+
+  private void recordInsertRowsFailure(
+      final InsertRowsOfOneDeviceNode sourceInsertRowsNode,
+      final InsertRowsNode failedInsertRowsNode,
+      final WriteProcessException exception) {
+    final TSStatus failureStatus =
+        RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage());
+    for (Integer failedIndex : 
failedInsertRowsNode.getInsertRowNodeIndexList()) {
+      sourceInsertRowsNode.getResults().put(failedIndex, failureStatus);
+    }
+  }
   private void tryToUpdateInsertRowsLastCache(List<InsertRowNode> nodeList) {
     for (InsertRowNode node : nodeList) {
       node.updateLastCache(databaseName);
@@ -1440,7 +1546,8 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean 
sequence) {
+  protected TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, 
boolean sequence)
+      throws WriteProcessException {
     TsFileProcessor tsFileProcessor = null;
     int retryCnt = 0;
     do {
@@ -1466,7 +1573,7 @@ public class DataRegion implements IDataRegionForQuery {
             "disk space is insufficient when creating TsFile processor, change 
system mode to read-only",
             e);
         
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
-        break;
+        throw new WriteProcessException(e.getMessage(), e.getErrorCode(), 
true);
       } catch (IOException e) {
         if (retryCnt < 3) {
           logger.warn("meet IOException when creating TsFileProcessor, retry 
it again", e);
@@ -1475,11 +1582,15 @@ public class DataRegion implements IDataRegionForQuery {
           logger.error(
               "meet IOException when creating TsFileProcessor, change system 
mode to error", e);
           
CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
-          break;
+          throw new WriteProcessException(
+              String.format(
+                  "Failed to create TsFileProcessor for database %s, 
timePartitionId %s",
+                  databaseName, timeRangeId),
+              e);
         }
       } catch (ExceedQuotaException e) {
         logger.error(e.getMessage());
-        break;
+        throw new WriteProcessException(e.getMessage(), e.getErrorCode(), 
true);
       }
     } while (tsFileProcessor == null);
     return tsFileProcessor;
@@ -3663,8 +3774,13 @@ public class DataRegion implements IDataRegionForQuery {
             config.isEnableSeparateData()
                 && insertRowNode.getTime()
                     > lastFlushTimeMap.getFlushedTime(timePartitionId, 
insertRowNode.getDeviceID());
-        TsFileProcessor tsFileProcessor = 
getOrCreateTsFileProcessor(timePartitionId, isSequence);
-        if (tsFileProcessor == null) {
+        final TsFileProcessor tsFileProcessor;
+        try {
+          tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, 
isSequence);
+        } catch (WriteProcessException e) {
+          insertRowsOfOneDeviceNode
+              .getResults()
+              .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
           continue;
         }
         int finalI = i;
@@ -3672,39 +3788,27 @@ public class DataRegion implements IDataRegionForQuery {
             tsFileProcessor,
             (k, v) -> {
               if (v == null) {
-                v = new 
InsertRowsNode(insertRowsOfOneDeviceNode.getPlanNodeId());
-                v.setSearchIndex(insertRowNode.getSearchIndex());
-                v.setAligned(insertRowNode.isAligned());
-                if (insertRowNode.isGeneratedByPipe()) {
-                  v.markAsGeneratedByPipe();
-                }
-                if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
-                  v.markAsGeneratedByRemoteConsensusLeader();
-                }
+                v = createGroupedInsertRowsNode(insertRowsOfOneDeviceNode, 
insertRowNode);
               }
-              v.addOneInsertRowNode(insertRowNode, finalI);
-              v.updateProgressIndex(insertRowNode.getProgressIndex());
+              appendInsertRowNode(v, insertRowNode, finalI);
               return v;
             });
       }
       List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
       for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
-        TsFileProcessor tsFileProcessor = entry.getKey();
         InsertRowsNode subInsertRowsNode = entry.getValue();
         try {
-          tsFileProcessor.insert(subInsertRowsNode, costsForMetrics);
+          List<TsFileProcessor> insertedProcessors =
+              insertRowsWithTypeConsistencyCheck(entry.getKey(), 
subInsertRowsNode, costsForMetrics);
+          
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
+          for (TsFileProcessor tsFileProcessor : insertedProcessors) {
+            // check memtable size and may asyncTryToFlush the work memtable
+            if (tsFileProcessor.shouldFlush()) {
+              fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+            }
+          }
         } catch (WriteProcessException e) {
-          insertRowsOfOneDeviceNode
-              .getResults()
-              .put(
-                  subInsertRowsNode.getInsertRowNodeIndexList().get(0),
-                  RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
-        }
-        
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
-
-        // check memtable size and may asyncTryToFlush the work memtable
-        if (tsFileProcessor.shouldFlush()) {
-          fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+          recordInsertRowsFailure(insertRowsOfOneDeviceNode, 
subInsertRowsNode, e);
         }
       }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
index 73fca57a1fd..45c6d6f86cf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
@@ -41,7 +41,7 @@ public class SubscriptionTaskSinkStage extends 
PipeTaskSinkStage {
 
   @Override
   protected void registerSubtask() {
-    this.connectorSubtaskId =
+    this.sinkSubtaskId =
         SubscriptionSinkSubtaskManager.instance()
             .register(
                 executor.get(),
@@ -56,22 +56,21 @@ public class SubscriptionTaskSinkStage extends 
PipeTaskSinkStage {
 
   @Override
   public void startSubtask() throws PipeException {
-    SubscriptionSinkSubtaskManager.instance().start(connectorSubtaskId);
+    SubscriptionSinkSubtaskManager.instance().start(sinkSubtaskId);
   }
 
   @Override
   public void stopSubtask() throws PipeException {
-    SubscriptionSinkSubtaskManager.instance().stop(connectorSubtaskId);
+    SubscriptionSinkSubtaskManager.instance().stop(sinkSubtaskId);
   }
 
   @Override
   public void dropSubtask() throws PipeException {
     SubscriptionSinkSubtaskManager.instance()
-        .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
+        .deregister(pipeName, creationTime, regionId, sinkSubtaskId);
   }
 
   public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
-    return SubscriptionSinkSubtaskManager.instance()
-        .getPipeConnectorPendingQueue(connectorSubtaskId);
+    return 
SubscriptionSinkSubtaskManager.instance().getPipeConnectorPendingQueue(sinkSubtaskId);
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 5082b8fc4f0..8eab3e6be49 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -27,14 +28,17 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.exception.ShutdownException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.queryengine.common.QueryId;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -62,6 +66,8 @@ import 
org.apache.iotdb.db.storageengine.rescon.memory.MemTableManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.constant.TestConstant;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.PlainDeviceID;
@@ -78,6 +84,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,6 +94,15 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode;
+import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 
 public class DataRegionTest {
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
@@ -126,6 +142,7 @@ public class DataRegionTest {
       dataRegion.syncDeleteDataFiles();
       StorageEngine.getInstance().deleteDataRegion(new DataRegionId(0));
     }
+    DataNodeSchemaCache.getInstance().cleanUp();
     EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
     CompactionTaskManager.getInstance().stop();
     EnvironmentUtils.cleanEnv();
@@ -975,6 +992,189 @@ public class DataRegionTest {
     dataRegion1.syncDeleteDataFiles();
   }
 
+  @Test
+  public void testInsertRowPropagatesTsFileProcessorCreationFailure()
+      throws IllegalPathException, DataRegionException, 
TsFileProcessorException {
+    final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, 
"root.fail_row");
+    dataRegion1.setTsFileProcessorSupplier(
+        (timePartitionId, sequence) -> {
+          throw new WriteProcessRejectException("mock creation failure");
+        });
+
+    final TSRecord record = new TSRecord(1, "root.fail_row");
+    record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(1)));
+    final InsertRowNode insertRowNode = buildInsertRowNodeByTSRecord(record);
+
+    try {
+      dataRegion1.insert(insertRowNode);
+      Assert.fail("Expected WriteProcessRejectException");
+    } catch (WriteProcessRejectException e) {
+      Assert.assertTrue(e.getMessage().contains("mock creation failure"));
+    } catch (WriteProcessException e) {
+      Assert.fail("Expected WriteProcessRejectException but got " + 
e.getClass().getSimpleName());
+    } finally {
+      dataRegion1.syncDeleteDataFiles();
+    }
+  }
+
+  @Test
+  public void testInsertRowsMarkAllFailedRowsForSameProcessor() throws 
Exception {
+    final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, 
"root.fail_rows");
+    final TsFileProcessor processor = Mockito.mock(TsFileProcessor.class);
+    Mockito.doThrow(new WriteProcessException("mock insert rows failure"))
+        .when(processor)
+        .insertRows(any(InsertRowsNode.class), any(long[].class));
+    Mockito.when(processor.shouldFlush()).thenReturn(false);
+    Mockito.when(processor.isSequence()).thenReturn(true);
+    dataRegion1.setTsFileProcessorSupplier((timePartitionId, sequence) -> 
processor);
+
+    final List<Integer> indexList = Arrays.asList(0, 1);
+    final List<InsertRowNode> nodes = new ArrayList<>();
+    for (long time : new long[] {1, 2}) {
+      final TSRecord record = new TSRecord(time, "root.fail_rows");
+      record.addTuple(
+          DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(time)));
+      nodes.add(buildInsertRowNodeByTSRecord(record));
+    }
+    final InsertRowsNode insertRowsNode = new InsertRowsNode(new 
PlanNodeId(""), indexList, nodes);
+
+    try {
+      dataRegion1.insert(insertRowsNode);
+      Assert.fail("Expected BatchProcessException");
+    } catch (BatchProcessException e) {
+      Assert.assertEquals(2, insertRowsNode.getResults().size());
+      Assert.assertEquals(
+          TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode(),
+          insertRowsNode.getResults().get(0).getCode());
+      Assert.assertEquals(
+          TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode(),
+          insertRowsNode.getResults().get(1).getCode());
+    } finally {
+      dataRegion1.syncDeleteDataFiles();
+    }
+  }
+
+  @Test
+  public void testInsertRowsLastCacheSkipsFailedRows() throws Exception {
+    final boolean originalLastCacheEnable = COMMON_CONFIG.isLastCacheEnable();
+    COMMON_CONFIG.setLastCacheEnable(true);
+
+    final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, 
"root.cache_rows");
+    final TsFileProcessor successProcessor = 
Mockito.mock(TsFileProcessor.class);
+    Mockito.when(successProcessor.shouldFlush()).thenReturn(false);
+    Mockito.when(successProcessor.isSequence()).thenReturn(true);
+    final long failingTime = TimePartitionUtils.getTimePartitionInterval() + 1;
+    final long failingPartitionId = 
TimePartitionUtils.getTimePartitionId(failingTime);
+    dataRegion1.setTsFileProcessorSupplier(
+        (timePartitionId, sequence) -> {
+          if (timePartitionId == failingPartitionId) {
+            throw new WriteProcessException("mock row failure");
+          }
+          return successProcessor;
+        });
+
+    final MeasurementPath lastCachePath =
+        new MeasurementPath(
+            "root.cache_rows",
+            measurementId,
+            new MeasurementSchema(
+                measurementId, TSDataType.INT32, TSEncoding.PLAIN, 
CompressionType.UNCOMPRESSED));
+    
DataNodeSchemaCache.getInstance().declareLastCache(dataRegion1.getDatabaseName(),
 lastCachePath);
+
+    final List<Integer> indexList = Arrays.asList(0, 1);
+    final List<InsertRowNode> nodes = new ArrayList<>();
+    final long[] times = new long[] {1, failingTime};
+    final int[] values = new int[] {10, 20};
+    for (int i = 0; i < times.length; i++) {
+      final long time = times[i];
+      final TSRecord record = new TSRecord(time, "root.cache_rows");
+      record.addTuple(
+          DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(values[i])));
+      nodes.add(buildInsertRowNodeByTSRecord(record));
+    }
+    final InsertRowsNode insertRowsNode = new InsertRowsNode(new 
PlanNodeId(""), indexList, nodes);
+
+    try {
+      dataRegion1.insert(insertRowsNode);
+      Assert.fail("Expected BatchProcessException");
+    } catch (BatchProcessException e) {
+      final TimeValuePair lastCache =
+          DataNodeSchemaCache.getInstance().getLastCache(lastCachePath);
+      Assert.assertNotNull(lastCache);
+      Assert.assertEquals(1, lastCache.getTimestamp());
+      Assert.assertEquals(10, lastCache.getValue().getInt());
+    } finally {
+      dataRegion1.syncDeleteDataFiles();
+      COMMON_CONFIG.setLastCacheEnable(originalLastCacheEnable);
+    }
+  }
+
+  @Test
+  public void testInsertTabletLastCacheSkipsFailedRows() throws Exception {
+    final boolean originalLastCacheEnable = COMMON_CONFIG.isLastCacheEnable();
+    COMMON_CONFIG.setLastCacheEnable(true);
+
+    final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, 
"root.cache_tablet");
+    final TsFileProcessor processor = Mockito.mock(TsFileProcessor.class);
+    Mockito.doAnswer(
+            invocation -> {
+              TSStatus[] results = invocation.getArgument(3);
+              results[0] = RpcUtils.SUCCESS_STATUS;
+              results[1] =
+                  RpcUtils.getStatus(
+                      TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), 
"mock row failure");
+              throw new WriteProcessException("mock tablet failure");
+            })
+        .when(processor)
+        .insertTablet(any(InsertTabletNode.class), anyInt(), anyInt(), 
any(TSStatus[].class));
+    Mockito.when(processor.shouldFlush()).thenReturn(false);
+    Mockito.when(processor.isSequence()).thenReturn(true);
+    dataRegion1.setTsFileProcessorSupplier((timePartitionId, sequence) -> 
processor);
+
+    final MeasurementPath lastCachePath =
+        new MeasurementPath(
+            "root.cache_tablet",
+            measurementId,
+            new MeasurementSchema(
+                measurementId, TSDataType.INT32, TSEncoding.PLAIN, 
CompressionType.UNCOMPRESSED));
+    
DataNodeSchemaCache.getInstance().declareLastCache(dataRegion1.getDatabaseName(),
 lastCachePath);
+
+    final String[] measurements = new String[] {measurementId};
+    final TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32};
+    final MeasurementSchema[] measurementSchemas =
+        new MeasurementSchema[] {
+          new MeasurementSchema(measurementId, TSDataType.INT32, 
TSEncoding.PLAIN)
+        };
+    final long[] times = new long[] {1, 2};
+    final Object[] columns = new Object[] {new int[] {10, 20}};
+    final InsertTabletNode insertTabletNode =
+        new InsertTabletNode(
+            new QueryId("test_write").genPlanNodeId(),
+            new PartialPath("root.cache_tablet"),
+            false,
+            measurements,
+            dataTypes,
+            measurementSchemas,
+            times,
+            null,
+            columns,
+            times.length);
+
+    try {
+      dataRegion1.insertTablet(insertTabletNode);
+      Assert.fail("Expected BatchProcessException");
+    } catch (BatchProcessException e) {
+      final TimeValuePair lastCache =
+          DataNodeSchemaCache.getInstance().getLastCache(lastCachePath);
+      Assert.assertNotNull(lastCache);
+      Assert.assertEquals(1, lastCache.getTimestamp());
+      Assert.assertEquals(10, lastCache.getValue().getInt());
+    } finally {
+      dataRegion1.syncDeleteDataFiles();
+      COMMON_CONFIG.setLastCacheEnable(originalLastCacheEnable);
+    }
+  }
+
   @Test
   public void testSmallReportProportionInsertRow()
       throws WriteProcessException,
@@ -1490,6 +1690,32 @@ public class DataRegionTest {
     }
   }
 
+  private interface TsFileProcessorSupplier {
+    TsFileProcessor get(long timePartitionId, boolean sequence) throws 
WriteProcessException;
+  }
+
+  private static class HookedDataRegion extends DummyDataRegion {
+    private TsFileProcessorSupplier tsFileProcessorSupplier;
+
+    private HookedDataRegion(String systemInfoDir, String storageGroupName)
+        throws DataRegionException {
+      super(systemInfoDir, storageGroupName);
+    }
+
+    private void setTsFileProcessorSupplier(TsFileProcessorSupplier 
tsFileProcessorSupplier) {
+      this.tsFileProcessorSupplier = tsFileProcessorSupplier;
+    }
+
+    @Override
+    protected TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, 
boolean sequence)
+        throws WriteProcessException {
+      if (tsFileProcessorSupplier != null) {
+        return tsFileProcessorSupplier.get(timeRangeId, sequence);
+      }
+      return super.getOrCreateTsFileProcessor(timeRangeId, sequence);
+    }
+  }
+
   // -- test for deleting data directly
   // -- delete data and file only when:
   // 1. tsfile is closed
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 3ff47a2c5e1..46280ede66c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -284,7 +284,7 @@ public class ClientPoolFactory {
                       
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
                       
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
                       .build(),
-                  ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
+                  ThreadName.PIPE_ASYNC_SINK_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
                   
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxClientNumber())
                   .build()
@@ -310,7 +310,7 @@ public class ClientPoolFactory {
                       
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
                       
.setPrintLogWhenEncounterException(conf.isPrintLogWhenEncounterException())
                       .build(),
-                  ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
+                  ThreadName.PIPE_ASYNC_SINK_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
                   
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxTsFileClientNumber())
                   .build()
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 696b6b8ce07..9a8beac5a43 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -129,10 +129,10 @@ public enum ThreadName {
   GPRC_DEFAULT_WORKER_ELG("grpc-default-worker-ELG"),
   GROUP_MANAGEMENT("groupManagement"),
   // -------------------------- Compute --------------------------
-  PIPE_EXTRACTOR_DISRUPTOR("Pipe-Extractor-Disruptor"),
+  PIPE_SOURCE_DISRUPTOR("Pipe-Source-Disruptor"),
   PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
-  PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
   PIPE_CONSENSUS_EXECUTOR_POOL("Pipe-Consensus-Executor-Pool"),
+  PIPE_SINK_EXECUTOR_POOL("Pipe-Sink-Executor-Pool"),
   PIPE_CONFIGNODE_EXECUTOR_POOL("Pipe-ConfigNode-Executor-Pool"),
   PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
   PIPE_TSFILE_ASYNC_SEND_POOL("Pipe-TsFile-Async-Send-Pool"),
@@ -142,7 +142,7 @@ public enum ThreadName {
   PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR("Pipe-Runtime-Periodical-Job-Executor"),
   PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER(
       "Pipe-Runtime-Periodical-Phantom-Reference-Cleaner"),
-  PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
+  PIPE_ASYNC_SINK_CLIENT_POOL("Pipe-Async-Sink-Client-Pool"),
   PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
   PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"),
   PIPE_PARALLEL_EXECUTION_POOL("Pipe-Parallel-Execution-Pool"),
@@ -289,9 +289,9 @@ public enum ThreadName {
   private static final Set<ThreadName> computeThreadNames =
       new HashSet<>(
           Arrays.asList(
-              PIPE_EXTRACTOR_DISRUPTOR,
+              PIPE_SOURCE_DISRUPTOR,
               PIPE_PROCESSOR_EXECUTOR_POOL,
-              PIPE_CONNECTOR_EXECUTOR_POOL,
+              PIPE_SINK_EXECUTOR_POOL,
               PIPE_CONSENSUS_EXECUTOR_POOL,
               PIPE_CONFIGNODE_EXECUTOR_POOL,
               PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL,
@@ -300,7 +300,7 @@ public enum ThreadName {
               PIPE_RUNTIME_PROCEDURE_SUBMITTER,
               PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR,
               PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER,
-              PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
+              PIPE_ASYNC_SINK_CLIENT_POOL,
               PIPE_RECEIVER_AIR_GAP_AGENT,
               PIPE_AIR_GAP_RECEIVER,
               PIPE_PARALLEL_EXECUTION_POOL,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index ec9ce06fd1d..3b7229927d1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -277,11 +277,11 @@ public class CommonConfig {
   private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
   private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30;
   private long pipeAsyncSinkMaxRetryExecutionTimeMsPerCall = 500;
-  private int pipeAsyncConnectorSelectorNumber =
+  private int pipeAsyncSinkSelectorNumber =
       Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
-  private int pipeAsyncConnectorMaxClientNumber =
+  private int pipeAsyncSinkMaxClientNumber =
       Math.max(32, Runtime.getRuntime().availableProcessors() * 2);
-  private int pipeAsyncConnectorMaxTsFileClientNumber =
+  private int pipeAsyncSinkMaxTsFileClientNumber =
       Math.max(16, Runtime.getRuntime().availableProcessors());
   private boolean printLogWhenEncounterException = false;
 
@@ -289,8 +289,7 @@ public class CommonConfig {
   private double pipeAllSinksRateLimitBytesPerSecond = -1;
   private int rateLimiterHotReloadCheckIntervalMs = 1000;
 
-  private int pipeConnectorRequestSliceThresholdBytes =
-      (int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
+  private int pipeSinkRequestSliceThresholdBytes = (int) 
(RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
 
   private boolean isSeperatedPipeHeartbeatEnabled = true;
   private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 3;
@@ -1002,7 +1001,7 @@ public class CommonConfig {
   }
 
   public void setPipeSinkHandshakeTimeoutMs(long pipeSinkHandshakeTimeoutMs) {
-    final int fPipeConnectorHandshakeTimeoutMs = 
this.pipeSinkHandshakeTimeoutMs;
+    final int fPipeSinkHandshakeTimeoutMs = this.pipeSinkHandshakeTimeoutMs;
     try {
       this.pipeSinkHandshakeTimeoutMs = 
Math.toIntExact(pipeSinkHandshakeTimeoutMs);
     } catch (ArithmeticException e) {
@@ -1010,7 +1009,7 @@ public class CommonConfig {
       logger.warn(
           "Given pipe connector handshake timeout is too large, set to {} 
ms.", Integer.MAX_VALUE);
     } finally {
-      if (fPipeConnectorHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs) 
{
+      if (fPipeSinkHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs) {
         logger.info("pipeSinkHandshakeTimeoutMs is set to {}.", 
this.pipeSinkHandshakeTimeoutMs);
       }
     }
@@ -1041,16 +1040,16 @@ public class CommonConfig {
   }
 
   public void setPipeSinkTransferTimeoutMs(long pipeSinkTransferTimeoutMs) {
-    final int fPipeConnectorTransferTimeoutMs = this.pipeSinkTransferTimeoutMs;
+    final int fPipeSinkTransferTimeoutMs = this.pipeSinkTransferTimeoutMs;
     try {
       this.pipeSinkTransferTimeoutMs = 
Math.toIntExact(pipeSinkTransferTimeoutMs);
     } catch (ArithmeticException e) {
       this.pipeSinkTransferTimeoutMs = Integer.MAX_VALUE;
       logger.warn(
-          "Given pipe connector transfer timeout is too large, set to {} ms.", 
Integer.MAX_VALUE);
+          "Given pipe sink transfer timeout is too large, set to {} ms.", 
Integer.MAX_VALUE);
     } finally {
-      if (fPipeConnectorTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) {
-        logger.info("pipeConnectorTransferTimeoutMs is set to {}.", 
pipeSinkTransferTimeoutMs);
+      if (fPipeSinkTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) {
+        logger.info("pipeSinkTransferTimeoutMs is set to {}.", 
pipeSinkTransferTimeoutMs);
       }
     }
   }
@@ -1064,7 +1063,7 @@ public class CommonConfig {
       return;
     }
     this.pipeSinkReadFileBufferSize = pipeSinkReadFileBufferSize;
-    logger.info("pipeConnectorReadFileBufferSize is set to {}.", 
pipeSinkReadFileBufferSize);
+    logger.info("pipeSinkReadFileBufferSize is set to {}.", 
pipeSinkReadFileBufferSize);
   }
 
   public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
@@ -1164,60 +1163,58 @@ public class CommonConfig {
   }
 
   public int getPipeAsyncSinkSelectorNumber() {
-    return pipeAsyncConnectorSelectorNumber;
+    return pipeAsyncSinkSelectorNumber;
   }
 
-  public void setPipeAsyncConnectorSelectorNumber(int 
pipeAsyncConnectorSelectorNumber) {
-    if (pipeAsyncConnectorSelectorNumber <= 0) {
+  public void setPipeAsyncSinkSelectorNumber(int pipeAsyncSinkSelectorNumber) {
+    if (pipeAsyncSinkSelectorNumber <= 0) {
       logger.info(
-          "pipeAsyncConnectorSelectorNumber should be greater than 0, 
configuring it not to change.");
+          "pipeAsyncSinkSelectorNumber should be greater than 0, configuring 
it not to change.");
       return;
     }
-    pipeAsyncConnectorSelectorNumber = Math.max(4, 
pipeAsyncConnectorSelectorNumber);
-    if (this.pipeAsyncConnectorSelectorNumber == 
pipeAsyncConnectorSelectorNumber) {
+    pipeAsyncSinkSelectorNumber = Math.max(4, pipeAsyncSinkSelectorNumber);
+    if (this.pipeAsyncSinkSelectorNumber == pipeAsyncSinkSelectorNumber) {
       return;
     }
-    this.pipeAsyncConnectorSelectorNumber = pipeAsyncConnectorSelectorNumber;
-    logger.info("pipeAsyncConnectorSelectorNumber is set to {}.", 
pipeAsyncConnectorSelectorNumber);
+    this.pipeAsyncSinkSelectorNumber = pipeAsyncSinkSelectorNumber;
+    logger.info("pipeAsyncSinkSelectorNumber is set to {}.", 
pipeAsyncSinkSelectorNumber);
   }
 
   public int getPipeAsyncSinkMaxClientNumber() {
-    return pipeAsyncConnectorMaxClientNumber;
+    return pipeAsyncSinkMaxClientNumber;
   }
 
-  public void setPipeAsyncConnectorMaxClientNumber(int 
pipeAsyncConnectorMaxClientNumber) {
-    if (pipeAsyncConnectorMaxClientNumber <= 0) {
+  public void setPipeAsyncSinkMaxClientNumber(int 
pipeAsyncSinkMaxClientNumber) {
+    if (pipeAsyncSinkMaxClientNumber <= 0) {
       logger.info(
-          " pipeAsyncConnectorMaxClientNumber should be greater than 0, 
configuring it not to change.");
+          " pipeAsyncSinkMaxClientNumber should be greater than 0, configuring 
it not to change.");
       return;
     }
-    pipeAsyncConnectorMaxClientNumber = Math.max(32, 
pipeAsyncConnectorMaxClientNumber);
-    if (this.pipeAsyncConnectorMaxClientNumber == 
pipeAsyncConnectorMaxClientNumber) {
+    pipeAsyncSinkMaxClientNumber = Math.max(32, pipeAsyncSinkMaxClientNumber);
+    if (this.pipeAsyncSinkMaxClientNumber == pipeAsyncSinkMaxClientNumber) {
       return;
     }
-    this.pipeAsyncConnectorMaxClientNumber = pipeAsyncConnectorMaxClientNumber;
-    logger.info(
-        "pipeAsyncConnectorMaxClientNumber is set to {}.", 
pipeAsyncConnectorMaxClientNumber);
+    this.pipeAsyncSinkMaxClientNumber = pipeAsyncSinkMaxClientNumber;
+    logger.info("pipeAsyncSinkMaxClientNumber is set to {}.", 
pipeAsyncSinkMaxClientNumber);
   }
 
   public int getPipeAsyncSinkMaxTsFileClientNumber() {
-    return pipeAsyncConnectorMaxTsFileClientNumber;
+    return pipeAsyncSinkMaxTsFileClientNumber;
   }
 
-  public void setPipeAsyncConnectorMaxTsFileClientNumber(
-      int pipeAsyncConnectorMaxTsFileClientNumber) {
-    if (pipeAsyncConnectorMaxTsFileClientNumber <= 0) {
+  public void setPipeAsyncSinkMaxTsFileClientNumber(int 
pipeAsyncSinkMaxTsFileClientNumber) {
+    if (pipeAsyncSinkMaxTsFileClientNumber <= 0) {
       logger.info(
-          "pipeAsyncConnectorMaxTsFileClientNumber should be greater than 0, 
configuring it not to change.");
+          "pipeAsyncSinkMaxTsFileClientNumber should be greater than 0, 
configuring it not to change.");
       return;
     }
-    pipeAsyncConnectorMaxTsFileClientNumber = Math.max(16, 
pipeAsyncConnectorMaxTsFileClientNumber);
-    if (this.pipeAsyncConnectorMaxTsFileClientNumber == 
pipeAsyncConnectorMaxTsFileClientNumber) {
+    pipeAsyncSinkMaxTsFileClientNumber = Math.max(16, 
pipeAsyncSinkMaxTsFileClientNumber);
+    if (this.pipeAsyncSinkMaxTsFileClientNumber == 
pipeAsyncSinkMaxTsFileClientNumber) {
       return;
     }
-    this.pipeAsyncConnectorMaxTsFileClientNumber = 
pipeAsyncConnectorMaxTsFileClientNumber;
+    this.pipeAsyncSinkMaxTsFileClientNumber = 
pipeAsyncSinkMaxTsFileClientNumber;
     logger.info(
-        "pipeAsyncConnectorMaxClientNumber is set to {}.", 
pipeAsyncConnectorMaxTsFileClientNumber);
+        "pipeAsyncSinkMaxTsFileClientNumber is set to {}.", 
pipeAsyncSinkMaxTsFileClientNumber);
   }
 
   public boolean isPrintLogWhenEncounterException() {
@@ -1321,12 +1318,12 @@ public class CommonConfig {
     return pipeSinkRetryIntervalMs;
   }
 
-  public void setPipeSinkRetryIntervalMs(long pipeConnectorRetryIntervalMs) {
-    if (this.pipeSinkRetryIntervalMs == pipeConnectorRetryIntervalMs) {
+  public void setPipeSinkRetryIntervalMs(long pipeSinkRetryIntervalMs) {
+    if (this.pipeSinkRetryIntervalMs == pipeSinkRetryIntervalMs) {
       return;
     }
-    this.pipeSinkRetryIntervalMs = pipeConnectorRetryIntervalMs;
-    logger.info("pipeSinkRetryIntervalMs is set to {}", 
pipeConnectorRetryIntervalMs);
+    this.pipeSinkRetryIntervalMs = pipeSinkRetryIntervalMs;
+    logger.info("pipeSinkRetryIntervalMs is set to {}", 
pipeSinkRetryIntervalMs);
   }
 
   public boolean isPipeSinkRetryLocallyForConnectionError() {
@@ -2127,18 +2124,16 @@ public class CommonConfig {
   }
 
   public int getPipeSinkRequestSliceThresholdBytes() {
-    return pipeConnectorRequestSliceThresholdBytes;
+    return pipeSinkRequestSliceThresholdBytes;
   }
 
-  public void setPipeConnectorRequestSliceThresholdBytes(
-      int pipeConnectorRequestSliceThresholdBytes) {
-    if (this.pipeConnectorRequestSliceThresholdBytes == 
pipeConnectorRequestSliceThresholdBytes) {
+  public void setPipeSinkRequestSliceThresholdBytes(int 
pipeSinkRequestSliceThresholdBytes) {
+    if (this.pipeSinkRequestSliceThresholdBytes == 
pipeSinkRequestSliceThresholdBytes) {
       return;
     }
-    this.pipeConnectorRequestSliceThresholdBytes = 
pipeConnectorRequestSliceThresholdBytes;
+    this.pipeSinkRequestSliceThresholdBytes = 
pipeSinkRequestSliceThresholdBytes;
     logger.info(
-        "pipeConnectorRequestSliceThresholdBytes is set to {}",
-        pipeConnectorRequestSliceThresholdBytes);
+        "pipeConnectorRequestSliceThresholdBytes is set to {}", 
pipeSinkRequestSliceThresholdBytes);
   }
 
   public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 02284671803..83db711e5ec 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -443,7 +443,7 @@ public class PipeDescriptor {
                 "rate_limiter_hot_reload_check_interval_ms",
                 
String.valueOf(config.getRateLimiterHotReloadCheckIntervalMs()))));
 
-    config.setPipeConnectorRequestSliceThresholdBytes(
+    config.setPipeSinkRequestSliceThresholdBytes(
         Integer.parseInt(
             properties.getProperty(
                 "pipe_connector_request_slice_threshold_bytes",
@@ -613,7 +613,7 @@ public class PipeDescriptor {
             "pipe_async_connector_selector_number",
             isHotModify);
     if (value != null) {
-      config.setPipeAsyncConnectorSelectorNumber(Integer.parseInt(value));
+      config.setPipeAsyncSinkSelectorNumber(Integer.parseInt(value));
     }
 
     value =
@@ -623,7 +623,7 @@ public class PipeDescriptor {
             "pipe_async_connector_max_client_number",
             isHotModify);
     if (value != null) {
-      config.setPipeAsyncConnectorMaxClientNumber(Integer.parseInt(value));
+      config.setPipeAsyncSinkMaxClientNumber(Integer.parseInt(value));
     }
 
     value =
@@ -633,7 +633,7 @@ public class PipeDescriptor {
             "pipe_async_connector_max_tsfile_client_number",
             isHotModify);
     if (value != null) {
-      
config.setPipeAsyncConnectorMaxTsFileClientNumber(Integer.parseInt(value));
+      config.setPipeAsyncSinkMaxTsFileClientNumber(Integer.parseInt(value));
     }
 
     value =


Reply via email to