This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 11d827771fe Fixed multiple bugs of insertion (#17570)
11d827771fe is described below

commit 11d827771fe87e408637256e0119839195af55d3
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 30 12:26:42 2026 +0800

    Fixed multiple bugs of insertion (#17570)
    
    * insert-fix
    
    * source/sink
    
    * source/sink-2
    
    * sptls
    
    * fix
    
    * sink
    
    * compile
---
 .../pipe/sink/protocol/IoTDBConfigRegionSink.java  |   9 +-
 .../dataregion/PipeDataRegionPluginAgent.java      |   8 +-
 .../iotdb/db/pipe/agent/task/PipeDataNodeTask.java |  28 +--
 .../agent/task/builder/PipeDataNodeBuilder.java    |   8 +-
 .../task/execution/PipeSinkSubtaskExecutor.java    |   2 +-
 .../agent/task/stage/PipeTaskProcessorStage.java   |  16 +-
 .../pipe/agent/task/stage/PipeTaskSinkStage.java   |  13 +-
 .../pipe/agent/task/stage/PipeTaskSourceStage.java |  19 +-
 .../airgap/IoTDBSchemaRegionAirGapSink.java        |   7 +-
 .../PipeTransferTabletBatchEventHandler.java       |  10 +-
 .../PipeTransferTabletInsertNodeEventHandler.java  |   2 +-
 .../PipeTransferTabletInsertionEventHandler.java   |   7 +-
 .../handler/PipeTransferTabletRawEventHandler.java |   6 +-
 .../handler/PipeTransferTrackableHandler.java      |  24 +-
 .../async/handler/PipeTransferTsFileHandler.java   |  27 +-
 .../realtime/assigner/DisruptorQueue.java          |   4 +-
 .../listener/PipeInsertionDataNodeListener.java    |  36 +--
 .../planner/plan/node/write/InsertTabletNode.java  |  60 ++++-
 .../node/write/RelationalInsertTabletNode.java     |   6 +-
 .../db/storageengine/dataregion/DataRegion.java    | 275 +++++++++++++--------
 .../task/stage/SubscriptionTaskSinkStage.java      |  11 +-
 .../storageengine/dataregion/DataRegionTest.java   | 227 +++++++++++++++++
 .../iotdb/commons/client/ClientPoolFactory.java    |   4 +-
 .../iotdb/commons/concurrent/ThreadName.java       |  12 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  93 ++++---
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |   8 +-
 26 files changed, 627 insertions(+), 295 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
index 36bd1cb1a5e..f06a47d4541 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
@@ -97,7 +97,7 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink {
   protected PipeTransferFilePieceReq getTransferSingleFilePieceReq(
       final String fileName, final long position, final byte[] payLoad) {
     throw new UnsupportedOperationException(
-        "The config region connector does not support transferring single file 
piece req.");
+        "The config region sink does not support transferring single file 
piece req.");
   }
 
   @Override
@@ -114,13 +114,13 @@ public class IoTDBConfigRegionSink extends 
IoTDBSslSyncSink {
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
     throw new UnsupportedOperationException(
-        "IoTDBConfigRegionConnector can't transfer TabletInsertionEvent.");
+        "IoTDBConfigRegionSink can't transfer TabletInsertionEvent.");
   }
 
   @Override
   public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
     throw new UnsupportedOperationException(
-        "IoTDBConfigRegionConnector can't transfer TsFileInsertionEvent.");
+        "IoTDBConfigRegionSink can't transfer TsFileInsertionEvent.");
   }
 
   @Override
@@ -130,8 +130,7 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink 
{
     } else if (event instanceof PipeConfigRegionSnapshotEvent) {
       doTransferWrapper((PipeConfigRegionSnapshotEvent) event);
     } else if (!(event instanceof PipeHeartbeatEvent)) {
-      LOGGER.warn(
-          "IoTDBConfigRegionConnector does not support transferring generic 
event: {}.", event);
+      LOGGER.warn("IoTDBConfigRegionSink does not support transferring generic 
event: {}.", event);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
index 67a7d6549d9..60103e078fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
@@ -75,22 +75,22 @@ public class PipeDataRegionPluginAgent extends 
PipePluginAgent {
     // TODO: validate visibility for schema region and config region
     final Visibility pipeVisibility =
         VisibilityUtils.calculateFromExtractorParameters(new 
PipeParameters(sourceAttributes));
-    final Visibility extractorVisibility =
+    final Visibility sourceVisibility =
         
VisibilityUtils.calculateFromPluginClass(temporaryExtractor.getClass());
     final Visibility processorVisibility =
         
VisibilityUtils.calculateFromPluginClass(temporaryProcessor.getClass());
     final Visibility connectorVisibility =
         
VisibilityUtils.calculateFromPluginClass(temporaryConnector.getClass());
     if (!VisibilityUtils.isCompatible(
-        pipeVisibility, extractorVisibility, processorVisibility, 
connectorVisibility)) {
+        pipeVisibility, sourceVisibility, processorVisibility, 
connectorVisibility)) {
       throw new PipeParameterNotValidException(
           String.format(
-              "The visibility of the pipe (%s, %s) is not compatible with the 
visibility of the extractor (%s, %s, %s), processor (%s, %s, %s), and connector 
(%s, %s, %s).",
+              "The visibility of the pipe (%s, %s) is not compatible with the 
visibility of the source (%s, %s, %s), processor (%s, %s, %s), and connector 
(%s, %s, %s).",
               pipeName,
               pipeVisibility,
               sourceAttributes,
               temporaryExtractor.getClass().getName(),
-              extractorVisibility,
+              sourceVisibility,
               processorAttributes,
               temporaryProcessor.getClass().getName(),
               processorVisibility,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
index d33ec44a86e..0d0b955c210 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
@@ -32,32 +32,32 @@ public class PipeDataNodeTask implements PipeTask {
   private final String pipeName;
   private final int regionId;
 
-  private final PipeTaskStage extractorStage;
+  private final PipeTaskStage sourceStage;
   private final PipeTaskStage processorStage;
-  private final PipeTaskStage connectorStage;
+  private final PipeTaskStage sinkStage;
 
   private volatile boolean isCompleted = false;
 
   public PipeDataNodeTask(
       final String pipeName,
       final int regionId,
-      final PipeTaskStage extractorStage,
+      final PipeTaskStage sourceStage,
       final PipeTaskStage processorStage,
-      final PipeTaskStage connectorStage) {
+      final PipeTaskStage sinkStage) {
     this.pipeName = pipeName;
     this.regionId = regionId;
 
-    this.extractorStage = extractorStage;
+    this.sourceStage = sourceStage;
     this.processorStage = processorStage;
-    this.connectorStage = connectorStage;
+    this.sinkStage = sinkStage;
   }
 
   @Override
   public void create() {
     final long startTime = System.currentTimeMillis();
-    extractorStage.create();
+    sourceStage.create();
     processorStage.create();
-    connectorStage.create();
+    sinkStage.create();
     LOGGER.info(
         "Create pipe DN task {} successfully within {} ms",
         this,
@@ -67,9 +67,9 @@ public class PipeDataNodeTask implements PipeTask {
   @Override
   public void drop() {
     final long startTime = System.currentTimeMillis();
-    extractorStage.drop();
+    sourceStage.drop();
     processorStage.drop();
-    connectorStage.drop();
+    sinkStage.drop();
     LOGGER.info(
         "Drop pipe DN task {} successfully within {} ms",
         this,
@@ -79,9 +79,9 @@ public class PipeDataNodeTask implements PipeTask {
   @Override
   public void start() {
     final long startTime = System.currentTimeMillis();
-    extractorStage.start();
+    sourceStage.start();
     processorStage.start();
-    connectorStage.start();
+    sinkStage.start();
     LOGGER.info(
         "Start pipe DN task {} successfully within {} ms",
         this,
@@ -91,9 +91,9 @@ public class PipeDataNodeTask implements PipeTask {
   @Override
   public void stop() {
     final long startTime = System.currentTimeMillis();
-    extractorStage.stop();
+    sourceStage.stop();
     processorStage.stop();
-    connectorStage.stop();
+    sinkStage.stop();
     LOGGER.info(
         "Stop pipe DN task {} successfully within {} ms",
         this,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
index b55704a3da4..46a10135d88 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
@@ -63,18 +63,18 @@ public class PipeDataNodeBuilder {
       final PipeTaskMeta pipeTaskMeta = 
consensusGroupIdToPipeTaskMeta.getValue();
 
       if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
-        final PipeParameters extractorParameters = 
pipeStaticMeta.getSourceParameters();
+        final PipeParameters sourceParameters = 
pipeStaticMeta.getSourceParameters();
         final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
         final boolean needConstructDataRegionTask =
             dataRegionIds.contains(dataRegionId)
                 && DataRegionListeningFilter.shouldDataRegionBeListened(
-                    extractorParameters, dataRegionId);
+                    sourceParameters, dataRegionId);
         final boolean needConstructSchemaRegionTask =
             schemaRegionIds.contains(new SchemaRegionId(consensusGroupId))
                 && SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
-                    consensusGroupId, extractorParameters);
+                    consensusGroupId, sourceParameters);
 
-        // Advance the extractor parameters parsing logic to avoid creating 
un-relevant pipeTasks
+        // Advance the source parameters parsing logic to avoid creating 
un-relevant pipeTasks
         if (needConstructDataRegionTask || needConstructSchemaRegionTask) {
           consensusGroupIdToPipeTaskMap.put(
               consensusGroupId,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
index 9a88ad74d7c..4abc8daed30 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
@@ -31,7 +31,7 @@ public class PipeSinkSubtaskExecutor extends 
PipeSubtaskExecutor {
   public PipeSinkSubtaskExecutor() {
     super(
         PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
-        ThreadName.PIPE_CONNECTOR_EXECUTOR_POOL.getName() + "-" + id.get(),
+        ThreadName.PIPE_SINK_EXECUTOR_POOL.getName() + "-" + id.get(),
         ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName() + "-" + 
id.getAndIncrement(),
         true);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
index 2e887f74bf0..2373495c8eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
@@ -56,8 +56,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
    * @param creationTime pipe creation time
    * @param pipeProcessorParameters used to create {@link PipeProcessor}
    * @param regionId {@link DataRegion} id
-   * @param pipeExtractorInputEventSupplier used to input {@link Event}s from 
{@link PipeExtractor}
-   * @param pipeConnectorOutputPendingQueue used to output {@link Event}s to 
{@link PipeConnector}
+   * @param pipeSourceInputEventSupplier used to input {@link Event}s from 
{@link PipeExtractor}
+   * @param pipeSinkOutputPendingQueue used to output {@link Event}s to {@link 
PipeConnector}
    * @throws PipeException if failed to {@link 
PipeProcessor#validate(PipeParameterValidator)} or
    *     {@link PipeProcessor#customize(PipeParameters, 
PipeProcessorRuntimeConfiguration)}}
    */
@@ -66,8 +66,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       final long creationTime,
       final PipeParameters pipeProcessorParameters,
       final int regionId,
-      final EventSupplier pipeExtractorInputEventSupplier,
-      final UnboundedBlockingPendingQueue<Event> 
pipeConnectorOutputPendingQueue,
+      final EventSupplier pipeSourceInputEventSupplier,
+      final UnboundedBlockingPendingQueue<Event> pipeSinkOutputPendingQueue,
       final PipeProcessorSubtaskExecutor executor,
       final PipeTaskMeta pipeTaskMeta,
       final boolean forceTabletFormat,
@@ -103,9 +103,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
     // old one, so we need creationTime to make their hash code different in 
the map.
     final String taskId = pipeName + "_" + regionId + "_" + creationTime;
     final boolean isUsedForConsensusPipe = 
pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX);
-    final PipeEventCollector pipeConnectorOutputEventCollector =
+    final PipeEventCollector pipeSinkOutputEventCollector =
         new PipeEventCollector(
-            pipeConnectorOutputPendingQueue,
+            pipeSinkOutputPendingQueue,
             creationTime,
             regionId,
             forceTabletFormat,
@@ -117,9 +117,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
             pipeName,
             creationTime,
             regionId,
-            pipeExtractorInputEventSupplier,
+            pipeSourceInputEventSupplier,
             pipeProcessor,
-            pipeConnectorOutputEventCollector);
+            pipeSinkOutputEventCollector);
 
     this.executor = executor;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
index a22fbb536d7..88eac560cde 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
@@ -38,7 +38,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
   protected final int regionId;
   protected final Supplier<? extends PipeSinkSubtaskExecutor> executor;
 
-  protected String connectorSubtaskId;
+  protected String sinkSubtaskId;
 
   public PipeTaskSinkStage(
       String pipeName,
@@ -56,7 +56,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
   }
 
   protected void registerSubtask() {
-    this.connectorSubtaskId =
+    this.sinkSubtaskId =
         PipeSinkSubtaskManager.instance()
             .register(
                 executor,
@@ -71,21 +71,20 @@ public class PipeTaskSinkStage extends PipeTaskStage {
 
   @Override
   public void startSubtask() throws PipeException {
-    PipeSinkSubtaskManager.instance().start(connectorSubtaskId);
+    PipeSinkSubtaskManager.instance().start(sinkSubtaskId);
   }
 
   @Override
   public void stopSubtask() throws PipeException {
-    PipeSinkSubtaskManager.instance().stop(connectorSubtaskId);
+    PipeSinkSubtaskManager.instance().stop(sinkSubtaskId);
   }
 
   @Override
   public void dropSubtask() throws PipeException {
-    PipeSinkSubtaskManager.instance()
-        .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
+    PipeSinkSubtaskManager.instance().deregister(pipeName, creationTime, 
regionId, sinkSubtaskId);
   }
 
   public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
-    return 
PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(connectorSubtaskId);
+    return 
PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(sinkSubtaskId);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
index 57a804df0d3..5f774ceb379 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
@@ -45,33 +45,32 @@ public class PipeTaskSourceStage extends PipeTaskStage {
   public PipeTaskSourceStage(
       String pipeName,
       long creationTime,
-      PipeParameters extractorParameters,
+      PipeParameters sourceParameters,
       int regionId,
       PipeTaskMeta pipeTaskMeta) {
     pipeExtractor =
         StorageEngine.getInstance().getAllDataRegionIds().contains(new 
DataRegionId(regionId))
                 || PipeRuntimeMeta.isSourceExternal(regionId)
-            ? 
PipeDataNodeAgent.plugin().dataRegion().reflectSource(extractorParameters)
-            : 
PipeDataNodeAgent.plugin().schemaRegion().reflectSource(extractorParameters);
+            ? 
PipeDataNodeAgent.plugin().dataRegion().reflectSource(sourceParameters)
+            : 
PipeDataNodeAgent.plugin().schemaRegion().reflectSource(sourceParameters);
 
-    // Validate and customize should be called before createSubtask. this 
allows extractor exposing
+    // Validate and customize should be called before createSubtask. this 
allows source exposing
     // exceptions in advance.
     try {
-      // 1. Validate extractor parameters
-      pipeExtractor.validate(new PipeParameterValidator(extractorParameters));
+      // 1. Validate source parameters
+      pipeExtractor.validate(new PipeParameterValidator(sourceParameters));
 
-      // 2. Customize extractor
+      // 2. Customize source
       final PipeTaskRuntimeConfiguration runtimeConfiguration =
           new PipeTaskRuntimeConfiguration(
               new PipeTaskSourceRuntimeEnvironment(pipeName, creationTime, 
regionId, pipeTaskMeta));
-      pipeExtractor.customize(extractorParameters, runtimeConfiguration);
+      pipeExtractor.customize(sourceParameters, runtimeConfiguration);
     } catch (Exception e) {
       try {
         pipeExtractor.close();
       } catch (Exception closeException) {
         LOGGER.warn(
-            "Failed to close extractor after failed to initialize extractor. "
-                + "Ignore this exception.",
+            "Failed to close source after failed to initialize source. " + 
"Ignore this exception.",
             closeException);
       }
       throw new PipeException(e.getMessage(), e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
index e4d39b49523..bc056857c17 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
@@ -52,13 +52,13 @@ public class IoTDBSchemaRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
     throw new UnsupportedOperationException(
-        "IoTDBSchemaRegionAirGapConnector can't transfer 
TabletInsertionEvent.");
+        "IoTDBSchemaRegionAirGapSink can't transfer TabletInsertionEvent.");
   }
 
   @Override
   public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
     throw new UnsupportedOperationException(
-        "IoTDBSchemaRegionAirGapConnector can't transfer 
TsFileInsertionEvent.");
+        "IoTDBSchemaRegionAirGapSink can't transfer TsFileInsertionEvent.");
   }
 
   @Override
@@ -73,8 +73,7 @@ public class IoTDBSchemaRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
         doTransferWrapper(socket, (PipeSchemaRegionSnapshotEvent) event);
       } else if (!(event instanceof PipeHeartbeatEvent)) {
         LOGGER.warn(
-            "IoTDBSchemaRegionAirGapConnector does not support transferring 
generic event: {}.",
-            event);
+            "IoTDBSchemaRegionAirGapSink does not support transferring generic 
event: {}.", event);
       }
     } catch (final IOException e) {
       isSocketAlive.set(socketIndex, false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index d95513a2228..52c52b1038e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -69,7 +69,7 @@ public class PipeTransferTabletBatchEventHandler extends 
PipeTransferTrackableHa
 
   public void transfer(final AsyncPipeDataTransferServiceClient client) throws 
TException {
     for (final Map.Entry<Pair<String, Long>, Long> entry : 
pipeName2BytesAccumulated.entrySet()) {
-      connector.rateLimitIfNeeded(
+      sink.rateLimitIfNeeded(
           entry.getKey().getLeft(),
           entry.getKey().getRight(),
           client.getEndPoint(),
@@ -92,13 +92,11 @@ public class PipeTransferTabletBatchEventHandler extends 
PipeTransferTrackableHa
       // Only handle the failed statuses to avoid string format performance 
overhead
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
           && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-        connector
-            .statusHandler()
-            .handle(status, response.getStatus().getMessage(), 
events.toString());
+        sink.statusHandler().handle(status, response.getStatus().getMessage(), 
events.toString());
       }
       for (final Pair<String, TEndPoint> redirectPair :
           LeaderCacheUtils.parseRecommendedRedirections(status)) {
-        connector.updateLeaderCache(redirectPair.getLeft(), 
redirectPair.getRight());
+        sink.updateLeaderCache(redirectPair.getLeft(), 
redirectPair.getRight());
       }
 
       events.forEach(
@@ -123,7 +121,7 @@ public class PipeTransferTabletBatchEventHandler extends 
PipeTransferTrackableHa
           events.size(),
           
events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet()));
     } finally {
-      connector.addFailureEventsToRetryQueue(events, exception);
+      sink.addFailureEventsToRetryQueue(events, exception);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 70ba7f4cfc5..912a1e724f7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -46,7 +46,7 @@ public class PipeTransferTabletInsertNodeEventHandler
 
   @Override
   protected void updateLeaderCache(final TSStatus status) {
-    connector.updateLeaderCache(
+    sink.updateLeaderCache(
         ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(), 
status.getRedirectNode());
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index ca92af92b19..ac252818c14 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -52,7 +52,7 @@ public abstract class PipeTransferTabletInsertionEventHandler 
extends PipeTransf
   }
 
   public void transfer(final AsyncPipeDataTransferServiceClient client) throws 
TException {
-    connector.rateLimitIfNeeded(
+    sink.rateLimitIfNeeded(
         event.getPipeName(), event.getCreationTime(), client.getEndPoint(), 
req.getBody().length);
 
     tryTransfer(client, req);
@@ -71,8 +71,7 @@ public abstract class PipeTransferTabletInsertionEventHandler 
extends PipeTransf
       // Only handle the failed statuses to avoid string format performance 
overhead
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
           && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-        connector
-            .statusHandler()
+        sink.statusHandler()
             .handle(response.getStatus(), response.getStatus().getMessage(), 
event.toString());
       }
       
event.decreaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName(),
 true);
@@ -98,7 +97,7 @@ public abstract class PipeTransferTabletInsertionEventHandler 
extends PipeTransf
           event.getCommitterKey(),
           event.getCommitId());
     } finally {
-      connector.addFailureEventToRetryQueue(event, exception);
+      sink.addFailureEventToRetryQueue(event, exception);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
index ff1daa05c28..b64e446827a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
@@ -32,8 +32,8 @@ public class PipeTransferTabletRawEventHandler extends 
PipeTransferTabletInserti
   public PipeTransferTabletRawEventHandler(
       final PipeRawTabletInsertionEvent event,
       final TPipeTransferReq req,
-      final IoTDBDataRegionAsyncSink connector) {
-    super(event, req, connector);
+      final IoTDBDataRegionAsyncSink sink) {
+    super(event, req, sink);
   }
 
   @Override
@@ -45,7 +45,7 @@ public class PipeTransferTabletRawEventHandler extends 
PipeTransferTabletInserti
 
   @Override
   protected void updateLeaderCache(final TSStatus status) {
-    connector.updateLeaderCache(
+    sink.updateLeaderCache(
         ((PipeRawTabletInsertionEvent) event).getDeviceId(), 
status.getRedirectNode());
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 21f7c144bed..a8b4a3b7a79 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -36,18 +36,18 @@ public abstract class PipeTransferTrackableHandler
     implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
 
-  protected final IoTDBDataRegionAsyncSink connector;
+  protected final IoTDBDataRegionAsyncSink sink;
   protected volatile AsyncPipeDataTransferServiceClient client;
 
-  public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink 
connector) {
-    this.connector = connector;
+  public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink sink) {
+    this.sink = sink;
   }
 
   @Override
   public void onComplete(final TPipeTransferResp response) {
-    if (connector.isClosed()) {
+    if (sink.isClosed()) {
       clearEventsReferenceCount();
-      connector.eliminateHandler(this, true);
+      sink.eliminateHandler(this, true);
       return;
     }
 
@@ -56,7 +56,7 @@ public abstract class PipeTransferTrackableHandler
       // completed
       // NOTE: We should not clear the reference count of events, as this 
would cause the
       // 
`org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic.IoTDBPipeDataSinkIT#testSinkTsFileFormat3`
 test to fail.
-      connector.eliminateHandler(this, false);
+      sink.eliminateHandler(this, false);
     }
   }
 
@@ -67,14 +67,14 @@ public abstract class PipeTransferTrackableHandler
       client.setPrintLogWhenEncounterException(false);
     }
 
-    if (connector.isClosed()) {
+    if (sink.isClosed()) {
       clearEventsReferenceCount();
-      connector.eliminateHandler(this, true);
+      sink.eliminateHandler(this, true);
       return;
     }
 
     onErrorInternal(exception);
-    connector.eliminateHandler(this, false);
+    sink.eliminateHandler(this, false);
   }
 
   /**
@@ -93,10 +93,10 @@ public abstract class PipeTransferTrackableHandler
       this.client = client;
     }
     // track handler before checking if connector is closed
-    connector.trackHandler(this);
-    if (connector.isClosed()) {
+    sink.trackHandler(this);
+    if (sink.isClosed()) {
       clearEventsReferenceCount();
-      connector.eliminateHandler(this, true);
+      sink.eliminateHandler(this, true);
       client.setShouldReturnSelf(true);
       client.returnSelf(
           (e) -> {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 3eaaa94c416..35a28d1413a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -162,7 +162,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     if (client == null) {
       LOGGER.warn(
           "Client has been returned to the pool. Current handler status is {}. 
Will not transfer {}.",
-          connector.isClosed() ? "CLOSED" : "NOT CLOSED",
+          sink.isClosed() ? "CLOSED" : "NOT CLOSED",
           tsFile);
       return;
     }
@@ -171,7 +171,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     client.setTimeoutDynamically(clientManager.getConnectionTimeout());
 
     PipeResourceMetrics.getInstance().recordDiskIO(readFileBufferSize);
-    if (connector.isEnableSendTsFileLimit()) {
+    if (sink.isEnableSendTsFileLimit()) {
       TsFileSendRateLimiter.getInstance().acquire(readFileBufferSize);
     }
     final int readLength = reader.read(readBuffer);
@@ -200,11 +200,11 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
                     dataBaseName)
                 : PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
                     tsFile.getName(), tsFile.length(), dataBaseName);
-        final TPipeTransferReq req = 
connector.compressIfNeeded(uncompressedReq);
+        final TPipeTransferReq req = sink.compressIfNeeded(uncompressedReq);
 
         pipeName2WeightMap.forEach(
             (pipePair, weight) ->
-                connector.rateLimitIfNeeded(
+                sink.rateLimitIfNeeded(
                     pipePair.getLeft(),
                     pipePair.getRight(),
                     client.getEndPoint(),
@@ -227,11 +227,11 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
                 currentFile.getName(), position, payload)
             : PipeTransferTsFilePieceReq.toTPipeTransferReq(
                 currentFile.getName(), position, payload);
-    final TPipeTransferReq req = connector.compressIfNeeded(uncompressedReq);
+    final TPipeTransferReq req = sink.compressIfNeeded(uncompressedReq);
 
     pipeName2WeightMap.forEach(
         (pipePair, weight) ->
-            connector.rateLimitIfNeeded(
+            sink.rateLimitIfNeeded(
                 pipePair.getLeft(),
                 pipePair.getRight(),
                 client.getEndPoint(),
@@ -249,7 +249,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     try {
       super.onComplete(response);
     } finally {
-      if (connector.isClosed()) {
+      if (sink.isClosed()) {
         returnClientIfNecessary();
       }
     }
@@ -263,8 +263,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
         // Only handle the failed statuses to avoid string format performance 
overhead
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-          connector
-              .statusHandler()
+          sink.statusHandler()
               .handle(
                   status,
                   String.format(
@@ -338,9 +337,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
         // Only handle the failed statuses to avoid string format performance 
overhead
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-          connector
-              .statusHandler()
-              .handle(status, response.getStatus().getMessage(), 
tsFile.getName());
+          sink.statusHandler().handle(status, 
response.getStatus().getMessage(), tsFile.getName());
         }
       }
 
@@ -412,7 +409,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
         returnClientIfNecessary();
       } finally {
         if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
-          connector.addFailureEventsToRetryQueue(events, exception);
+          sink.addFailureEventsToRetryQueue(events, exception);
         }
       }
     }
@@ -423,7 +420,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
       return;
     }
 
-    if (connector.isClosed()) {
+    if (sink.isClosed()) {
       closeClient();
     }
 
@@ -447,7 +444,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     if (client == null) {
       LOGGER.warn(
           "Client has been returned to the pool. Current handler status is {}. 
Will not transfer {}.",
-          connector.isClosed() ? "CLOSED" : "NOT CLOSED",
+          sink.isClosed() ? "CLOSED" : "NOT CLOSED",
           tsFile);
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index 52ac137ae4e..2019eba8560 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -35,13 +35,13 @@ import org.slf4j.LoggerFactory;
 
 import java.util.function.Consumer;
 
-import static 
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISRUPTOR;
+import static 
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_SOURCE_DISRUPTOR;
 
 public class DisruptorQueue {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DisruptorQueue.class);
   private static final IoTDBDaemonThreadFactory THREAD_FACTORY =
-      new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName());
+      new IoTDBDaemonThreadFactory(PIPE_SOURCE_DISRUPTOR.getName());
 
   private final PipeMemoryBlock allocatedMemoryBlock;
   private final Disruptor<EventContainer> disruptor;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index fded546d87d..157fb0078e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -45,33 +45,33 @@ import java.util.concurrent.atomic.AtomicInteger;
  *
  * <p>All events extracted by this listener will be first published to 
different
  * PipeEventDataRegionAssigners (identified by data region id), and then 
PipeEventDataRegionAssigner
- * will filter events and assign them to different 
PipeRealtimeEventDataRegionExtractors.
+ * will filter events and assign them to different 
PipeRealtimeEventDataRegionSources.
  */
 public class PipeInsertionDataNodeListener {
   private final ConcurrentMap<Integer, PipeDataRegionAssigner> 
dataRegionId2Assigner =
       new ConcurrentHashMap<>();
 
-  private final AtomicInteger listenToTsFileExtractorCount = new 
AtomicInteger(0);
-  private final AtomicInteger listenToInsertNodeExtractorCount = new 
AtomicInteger(0);
+  private final AtomicInteger listenToTsFileSourceCount = new AtomicInteger(0);
+  private final AtomicInteger listenToInsertNodeSourceCount = new 
AtomicInteger(0);
 
   //////////////////////////// start & stop ////////////////////////////
 
   public synchronized void startListenAndAssign(
-      final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
+      final int dataRegionId, final PipeRealtimeDataRegionSource source) {
     dataRegionId2Assigner
         .computeIfAbsent(dataRegionId, o -> new 
PipeDataRegionAssigner(dataRegionId))
-        .startAssignTo(extractor);
+        .startAssignTo(source);
 
-    if (extractor.isNeedListenToTsFile()) {
-      listenToTsFileExtractorCount.incrementAndGet();
+    if (source.isNeedListenToTsFile()) {
+      listenToTsFileSourceCount.incrementAndGet();
     }
-    if (extractor.isNeedListenToInsertNode()) {
-      listenToInsertNodeExtractorCount.incrementAndGet();
+    if (source.isNeedListenToInsertNode()) {
+      listenToInsertNodeSourceCount.incrementAndGet();
     }
   }
 
   public synchronized void stopListenAndAssign(
-      final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
+      final int dataRegionId, final PipeRealtimeDataRegionSource source) {
     PipeDataRegionAssigner assignerToClose = null;
 
     synchronized (this) {
@@ -80,13 +80,13 @@ public class PipeInsertionDataNodeListener {
         return;
       }
 
-      assigner.stopAssignTo(extractor);
+      assigner.stopAssignTo(source);
 
-      if (extractor.isNeedListenToTsFile()) {
-        listenToTsFileExtractorCount.decrementAndGet();
+      if (source.isNeedListenToTsFile()) {
+        listenToTsFileSourceCount.decrementAndGet();
       }
-      if (extractor.isNeedListenToInsertNode()) {
-        listenToInsertNodeExtractorCount.decrementAndGet();
+      if (source.isNeedListenToInsertNode()) {
+        listenToInsertNodeSourceCount.decrementAndGet();
       }
 
       if (assigner.notMoreSourceNeededToBeAssigned()) {
@@ -110,8 +110,8 @@ public class PipeInsertionDataNodeListener {
       final String databaseName,
       final TsFileResource tsFileResource,
       final boolean isLoaded) {
-    // We don't judge whether listenToTsFileExtractorCount.get() == 0 here on 
purpose
-    // because extractors may use tsfile events when some exceptions occur in 
the
+    // We don't judge whether listenToTsFileSourceCount.get() == 0 here on 
purpose
+    // because spirces may use tsfile events when some exceptions occur in the
     // insert nodes listening process.
 
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
@@ -131,7 +131,7 @@ public class PipeInsertionDataNodeListener {
       final String databaseName,
       final InsertNode insertNode,
       final TsFileResource tsFileResource) {
-    if (listenToInsertNodeExtractorCount.get() == 0) {
+    if (listenToInsertNodeSourceCount.get() == 0) {
       return;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 490f16ca5f0..995e8a95e3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -1193,36 +1193,72 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
     if (lastIdx < startOffset) {
       return null;
     }
+    return composeTimeValuePair(measurementIndex, lastIdx);
+  }
+
+  public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
+    return composeLastTimeValuePair(measurementIndex, 0, rowCount);
+  }
+
+  protected TimeValuePair composeLastTimeValuePair(
+      final int measurementIndex,
+      final TSStatus[] results,
+      final int startOffset,
+      final int endOffset) {
+    if (results == null) {
+      return composeLastTimeValuePair(measurementIndex, startOffset, 
endOffset);
+    }
+    if (measurementIndex >= columns.length || 
Objects.isNull(dataTypes[measurementIndex])) {
+      return null;
+    }
+
+    final BitMap bitMap = bitMaps == null ? null : bitMaps[measurementIndex];
+    int lastIdx = Math.min(endOffset - 1, rowCount - 1);
+    while (lastIdx >= startOffset) {
+      if (results[lastIdx] != null
+          && results[lastIdx].getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        lastIdx--;
+        continue;
+      }
+      if (bitMap != null && bitMap.isMarked(lastIdx)) {
+        lastIdx--;
+        continue;
+      }
+      break;
+    }
+    return lastIdx < startOffset ? null : 
composeTimeValuePair(measurementIndex, lastIdx);
+  }
 
+  private TimeValuePair composeTimeValuePair(final int measurementIndex, final 
int rowIndex) {
     TsPrimitiveType value;
     switch (dataTypes[measurementIndex]) {
       case INT32:
       case DATE:
         int[] intValues = (int[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsInt(intValues[lastIdx]);
+        value = new TsPrimitiveType.TsInt(intValues[rowIndex]);
         break;
       case INT64:
       case TIMESTAMP:
         long[] longValues = (long[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsLong(longValues[lastIdx]);
+        value = new TsPrimitiveType.TsLong(longValues[rowIndex]);
         break;
       case FLOAT:
         float[] floatValues = (float[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsFloat(floatValues[lastIdx]);
+        value = new TsPrimitiveType.TsFloat(floatValues[rowIndex]);
         break;
       case DOUBLE:
         double[] doubleValues = (double[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsDouble(doubleValues[lastIdx]);
+        value = new TsPrimitiveType.TsDouble(doubleValues[rowIndex]);
         break;
       case BOOLEAN:
         boolean[] boolValues = (boolean[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsBoolean(boolValues[lastIdx]);
+        value = new TsPrimitiveType.TsBoolean(boolValues[rowIndex]);
         break;
       case TEXT:
       case BLOB:
       case STRING:
         Binary[] binaryValues = (Binary[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]);
+        value = new TsPrimitiveType.TsBinary(binaryValues[rowIndex]);
         break;
       case OBJECT:
         return null;
@@ -1230,11 +1266,7 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
         throw new UnSupportedDataTypeException(
             String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
     }
-    return new TimeValuePair(times[lastIdx], value);
-  }
-
-  public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
-    return composeLastTimeValuePair(measurementIndex, 0, rowCount);
+    return new TimeValuePair(times[rowIndex], value);
   }
 
   public IDeviceID getDeviceID(int rowIdx) {
@@ -1313,10 +1345,14 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
   }
 
   public void updateLastCache(final String databaseName) {
+    updateLastCache(databaseName, null);
+  }
+
+  public void updateLastCache(final String databaseName, final TSStatus[] 
results) {
     final String[] rawMeasurements = getRawMeasurements();
     final TimeValuePair[] timeValuePairs = new 
TimeValuePair[rawMeasurements.length];
     for (int i = 0; i < rawMeasurements.length; i++) {
-      timeValuePairs[i] = composeLastTimeValuePair(i);
+      timeValuePairs[i] = composeLastTimeValuePair(i, results, 0, rowCount);
     }
     TreeDeviceSchemaCacheManager.getInstance()
         .updateLastCacheIfExists(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index b9f7edbe87c..8d24ad77364 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -371,6 +371,10 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
 
   @Override
   public void updateLastCache(final String databaseName) {
+    updateLastCache(databaseName, null);
+  }
+
+  public void updateLastCache(final String databaseName, final TSStatus[] 
results) {
     final String[] rawMeasurements = getRawMeasurements();
 
     final List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs = 
splitByDevice(0, rowCount);
@@ -381,7 +385,7 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
 
       final TimeValuePair[] timeValuePairs = new 
TimeValuePair[rawMeasurements.length];
       for (int i = 0; i < rawMeasurements.length; i++) {
-        timeValuePairs[i] = composeLastTimeValuePair(i, startOffset, 
endOffset);
+        timeValuePairs[i] = composeLastTimeValuePair(i, results, startOffset, 
endOffset);
       }
       TableDeviceSchemaCache.getInstance()
           .updateLastCacheIfExists(databaseName, deviceID, rawMeasurements, 
timeValuePairs);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 933efbae84c..e5d75b4a210 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1455,7 +1455,7 @@ public class DataRegion implements IDataRegionForQuery {
         && !insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
       // disable updating last cache on follower
       long startTime = System.nanoTime();
-      tryToUpdateInsertTabletLastCache(insertTabletNode);
+      tryToUpdateInsertTabletLastCache(insertTabletNode, results);
       
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
 - startTime);
     }
     return noFailure;
@@ -1506,18 +1506,12 @@ public class DataRegion implements IDataRegionForQuery {
       return true;
     }
 
-    TsFileProcessor tsFileProcessor = 
getOrCreateTsFileProcessor(timePartitionId, sequence);
-    if (tsFileProcessor == null) {
-      for (int[] rangePair : rangeList) {
-        int start = rangePair[0];
-        int end = rangePair[1];
-        for (int i = start; i < end; i++) {
-          results[i] =
-              RpcUtils.getStatus(
-                  TSStatusCode.INTERNAL_SERVER_ERROR,
-                  "can not create TsFileProcessor, timePartitionId: " + 
timePartitionId);
-        }
-      }
+    final TsFileProcessor tsFileProcessor;
+    try {
+      tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
+    } catch (WriteProcessException e) {
+      markInsertTabletRangesFailed(
+          rangeList, results, RpcUtils.getStatus(e.getErrorCode(), 
e.getMessage()));
       return false;
     }
 
@@ -1546,6 +1540,15 @@ public class DataRegion implements IDataRegionForQuery {
     return true;
   }
 
+  private void markInsertTabletRangesFailed(
+      final List<int[]> rangeList, final TSStatus[] results, final TSStatus 
failureStatus) {
+    for (int[] rangePair : rangeList) {
+      for (int i = rangePair[0]; i < rangePair[1]; i++) {
+        results[i] = failureStatus;
+      }
+    }
+  }
+
   private TableSchema getTableSchemaFromCache(
       final String database, final String tableName, final Pair<Long, Long> 
currentVersion) {
     final TableSchemaCacheKey key = new TableSchemaCacheKey(database, 
tableName);
@@ -1679,6 +1682,11 @@ public class DataRegion implements IDataRegionForQuery {
     node.updateLastCache(getDatabaseName());
   }
 
+  private void tryToUpdateInsertTabletLastCache(
+      final InsertTabletNode node, final TSStatus[] results) {
+    node.updateLastCache(getDatabaseName(), results);
+  }
+
   private TsFileProcessor insertToTsFileProcessor(
       InsertRowNode insertRowNode, boolean sequence, long timePartitionId)
       throws WriteProcessException {
@@ -1686,19 +1694,16 @@ public class DataRegion implements IDataRegionForQuery {
       return null;
     }
     TsFileProcessor tsFileProcessor = 
getOrCreateTsFileProcessor(timePartitionId, sequence);
-    if (tsFileProcessor == null) {
-      return null;
-    }
     long[] infoForMetrics = new long[5];
     // infoForMetrics[0]: CreateMemtableBlockTimeCost
     // infoForMetrics[1]: ScheduleMemoryBlockTimeCost
     // infoForMetrics[2]: ScheduleWalTimeCost
     // infoForMetrics[3]: ScheduleMemTableTimeCost
     // infoForMetrics[4]: InsertedPointsNumber
-    tsFileProcessor.insert(insertRowNode, infoForMetrics);
-    updateTsFileProcessorMetric(insertRowNode, infoForMetrics);
     // register TableSchema (and maybe more) for table insertion
     registerToTsFile(insertRowNode, tsFileProcessor);
+    tsFileProcessor.insert(insertRowNode, infoForMetrics);
+    updateTsFileProcessorMetric(insertRowNode, infoForMetrics);
     return tsFileProcessor;
   }
 
@@ -1717,9 +1722,11 @@ public class DataRegion implements IDataRegionForQuery {
       if (insertRowNode.allMeasurementFailed()) {
         continue;
       }
-      TsFileProcessor tsFileProcessor =
-          getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]);
-      if (tsFileProcessor == null) {
+      final TsFileProcessor tsFileProcessor;
+      try {
+        tsFileProcessor = getOrCreateTsFileProcessor(timePartitionIds[i], 
areSequence[i]);
+      } catch (WriteProcessException e) {
+        insertRowsNode.getResults().put(i, 
RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
         continue;
       }
       int finalI = i;
@@ -1727,78 +1734,156 @@ public class DataRegion implements IDataRegionForQuery 
{
           tsFileProcessor,
           (k, v) -> {
             if (v == null) {
-              v = insertRowsNode.emptyClone();
-              v.setSearchIndex(insertRowNode.getSearchIndex());
-              v.setAligned(insertRowNode.isAligned());
-              if (insertRowNode.isGeneratedByPipe()) {
-                v.markAsGeneratedByPipe();
-              }
-              if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
-                v.markAsGeneratedByRemoteConsensusLeader();
-              }
+              v = createGroupedInsertRowsNode(insertRowsNode, insertRowNode);
             }
-            if (v.isAligned() != insertRowNode.isAligned()) {
-              v.setMixingAlignment(true);
-            }
-            v.addOneInsertRowNode(insertRowNode, finalI);
-            v.updateProgressIndex(insertRowNode.getProgressIndex());
+            appendInsertRowNode(v, insertRowNode, finalI);
             return v;
           });
     }
 
     List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
     for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
-      TsFileProcessor tsFileProcessor = entry.getKey();
       InsertRowsNode subInsertRowsNode = entry.getValue();
       try {
-        tsFileProcessor =
-            insertRowsWithTypeConsistencyCheck(tsFileProcessor, 
subInsertRowsNode, infoForMetrics);
+        List<TsFileProcessor> insertedProcessors =
+            insertRowsWithTypeConsistencyCheck(entry.getKey(), 
subInsertRowsNode, infoForMetrics);
+        
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
+        for (TsFileProcessor tsFileProcessor : insertedProcessors) {
+          // check memtable size and may asyncTryToFlush the work memtable
+          if (tsFileProcessor.shouldFlush()) {
+            fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+          }
+        }
       } catch (WriteProcessException e) {
-        insertRowsNode
-            .getResults()
-            .put(
-                subInsertRowsNode.getInsertRowNodeIndexList().get(0),
-                RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
-      }
-      
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
-
-      // check memtable size and may asyncTryToFlush the work memtable
-      if (entry.getKey().shouldFlush()) {
-        fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+        recordInsertRowsFailure(insertRowsNode, subInsertRowsNode, e);
       }
     }
     return executedInsertRowNodeList;
   }
 
-  private TsFileProcessor insertRowsWithTypeConsistencyCheck(
+  private List<TsFileProcessor> insertRowsWithTypeConsistencyCheck(
       TsFileProcessor tsFileProcessor, InsertRowsNode subInsertRowsNode, 
long[] infoForMetrics)
       throws WriteProcessException {
     try {
       // register TableSchema (and maybe more) for table insertion
       registerToTsFile(subInsertRowsNode, tsFileProcessor);
       tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+      return Collections.singletonList(tsFileProcessor);
     } catch (DataTypeInconsistentException e) {
       InsertRowNode firstRow = subInsertRowsNode.getInsertRowNodeList().get(0);
       long timePartitionId = 
TimePartitionUtils.getTimePartitionId(firstRow.getTime());
       // flush both MemTables so that the new type can be inserted into a new 
MemTable
-      TsFileProcessor workSequenceProcessor = 
workSequenceTsFileProcessors.get(timePartitionId);
-      if (workSequenceProcessor != null) {
-        fileFlushPolicy.apply(this, workSequenceProcessor, 
workSequenceProcessor.isSequence());
-      }
-      TsFileProcessor workUnsequenceProcessor = 
workUnsequenceTsFileProcessors.get(timePartitionId);
-      if (workUnsequenceProcessor != null) {
-        fileFlushPolicy.apply(this, workUnsequenceProcessor, 
workUnsequenceProcessor.isSequence());
-      }
+      flushWorkingProcessorsForTimePartition(timePartitionId);
+      return retryInsertRowsAfterFlush(subInsertRowsNode, timePartitionId, 
infoForMetrics);
+    }
+  }
 
-      boolean isSequence =
+  private InsertRowsNode createGroupedInsertRowsNode(
+      final InsertRowsNode sourceInsertRowsNode, final InsertRowNode 
firstInsertRowNode) {
+    final InsertRowsNode groupedInsertRowsNode = 
sourceInsertRowsNode.emptyClone();
+    initializeGroupedInsertRowsNode(groupedInsertRowsNode, firstInsertRowNode);
+    return groupedInsertRowsNode;
+  }
+
+  private InsertRowsNode createGroupedInsertRowsNode(
+      final InsertRowsOfOneDeviceNode sourceInsertRowsNode,
+      final InsertRowNode firstInsertRowNode) {
+    final InsertRowsNode groupedInsertRowsNode =
+        new InsertRowsNode(sourceInsertRowsNode.getPlanNodeId());
+    initializeGroupedInsertRowsNode(groupedInsertRowsNode, firstInsertRowNode);
+    return groupedInsertRowsNode;
+  }
+
+  private void initializeGroupedInsertRowsNode(
+      final InsertRowsNode groupedInsertRowsNode, final InsertRowNode 
firstInsertRowNode) {
+    groupedInsertRowsNode.setSearchIndex(firstInsertRowNode.getSearchIndex());
+    groupedInsertRowsNode.setAligned(firstInsertRowNode.isAligned());
+    if (firstInsertRowNode.isGeneratedByPipe()) {
+      groupedInsertRowsNode.markAsGeneratedByPipe();
+    }
+    if (firstInsertRowNode.isGeneratedByRemoteConsensusLeader()) {
+      groupedInsertRowsNode.markAsGeneratedByRemoteConsensusLeader();
+    }
+  }
+
+  private void appendInsertRowNode(
+      final InsertRowsNode groupedInsertRowsNode,
+      final InsertRowNode insertRowNode,
+      final int insertRowNodeIndex) {
+    if (groupedInsertRowsNode.isAligned() != insertRowNode.isAligned()) {
+      groupedInsertRowsNode.setMixingAlignment(true);
+    }
+    groupedInsertRowsNode.addOneInsertRowNode(insertRowNode, 
insertRowNodeIndex);
+    
groupedInsertRowsNode.updateProgressIndex(insertRowNode.getProgressIndex());
+  }
+
+  private void flushWorkingProcessorsForTimePartition(final long 
timePartitionId) {
+    TsFileProcessor workSequenceProcessor = 
workSequenceTsFileProcessors.get(timePartitionId);
+    if (workSequenceProcessor != null) {
+      fileFlushPolicy.apply(this, workSequenceProcessor, 
workSequenceProcessor.isSequence());
+    }
+    TsFileProcessor workUnsequenceProcessor = 
workUnsequenceTsFileProcessors.get(timePartitionId);
+    if (workUnsequenceProcessor != null) {
+      fileFlushPolicy.apply(this, workUnsequenceProcessor, 
workUnsequenceProcessor.isSequence());
+    }
+  }
+
+  private List<TsFileProcessor> retryInsertRowsAfterFlush(
+      final InsertRowsNode subInsertRowsNode,
+      final long timePartitionId,
+      final long[] infoForMetrics)
+      throws WriteProcessException {
+    final Map<TsFileProcessor, InsertRowsNode> retriedProcessorMap = new 
HashMap<>();
+    for (int i = 0; i < subInsertRowsNode.getInsertRowNodeList().size(); i++) {
+      final InsertRowNode insertRowNode = 
subInsertRowsNode.getInsertRowNodeList().get(i);
+      final boolean isSequence =
           config.isEnableSeparateData()
-              && firstRow.getTime()
-                  > lastFlushTimeMap.getFlushedTime(timePartitionId, 
firstRow.getDeviceID());
-      tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, 
isSequence);
-      registerToTsFile(subInsertRowsNode, tsFileProcessor);
-      tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+              && insertRowNode.getTime()
+                  > lastFlushTimeMap.getFlushedTime(timePartitionId, 
insertRowNode.getDeviceID());
+      final TsFileProcessor retriedProcessor =
+          getOrCreateTsFileProcessor(timePartitionId, isSequence);
+      final int insertRowNodeIndex = 
subInsertRowsNode.getInsertRowNodeIndexList().get(i);
+      retriedProcessorMap.compute(
+          retriedProcessor,
+          (k, v) -> {
+            if (v == null) {
+              v = createGroupedInsertRowsNode(subInsertRowsNode, 
insertRowNode);
+            }
+            appendInsertRowNode(v, insertRowNode, insertRowNodeIndex);
+            return v;
+          });
+    }
+
+    final List<TsFileProcessor> insertedProcessors = new 
ArrayList<>(retriedProcessorMap.size());
+    for (Entry<TsFileProcessor, InsertRowsNode> retriedEntry : 
retriedProcessorMap.entrySet()) {
+      final TsFileProcessor retriedProcessor = retriedEntry.getKey();
+      registerToTsFile(retriedEntry.getValue(), retriedProcessor);
+      retriedProcessor.insertRows(retriedEntry.getValue(), infoForMetrics);
+      insertedProcessors.add(retriedProcessor);
+    }
+    return insertedProcessors;
+  }
+
+  private void recordInsertRowsFailure(
+      final InsertRowsNode sourceInsertRowsNode,
+      final InsertRowsNode failedInsertRowsNode,
+      final WriteProcessException exception) {
+    final TSStatus failureStatus =
+        RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage());
+    for (Integer failedIndex : 
failedInsertRowsNode.getInsertRowNodeIndexList()) {
+      sourceInsertRowsNode.getResults().put(failedIndex, failureStatus);
+    }
+  }
+
+  private void recordInsertRowsFailure(
+      final InsertRowsOfOneDeviceNode sourceInsertRowsNode,
+      final InsertRowsNode failedInsertRowsNode,
+      final WriteProcessException exception) {
+    final TSStatus failureStatus =
+        RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage());
+    for (Integer failedIndex : 
failedInsertRowsNode.getInsertRowNodeIndexList()) {
+      sourceInsertRowsNode.getResults().put(failedIndex, failureStatus);
     }
-    return tsFileProcessor;
   }
 
   private void tryToUpdateInsertRowsLastCache(List<InsertRowNode> nodeList) {
@@ -1859,7 +1944,8 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean 
sequence) {
+  protected TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, 
boolean sequence)
+      throws WriteProcessException {
     TsFileProcessor tsFileProcessor = null;
     int retryCnt = 0;
     do {
@@ -1885,7 +1971,7 @@ public class DataRegion implements IDataRegionForQuery {
             "disk space is insufficient when creating TsFile processor, change 
system mode to read-only",
             e);
         
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
-        break;
+        throw new WriteProcessException(e.getMessage(), e.getErrorCode(), 
true);
       } catch (IOException e) {
         if (retryCnt < 3) {
           logger.warn("meet IOException when creating TsFileProcessor, retry 
it again", e);
@@ -1894,11 +1980,15 @@ public class DataRegion implements IDataRegionForQuery {
           logger.error(
               "meet IOException when creating TsFileProcessor, change system 
mode to error", e);
           
CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
-          break;
+          throw new WriteProcessException(
+              String.format(
+                  "Failed to create TsFileProcessor for database %s, 
timePartitionId %s",
+                  databaseName, timeRangeId),
+              e);
         }
       } catch (ExceedQuotaException e) {
         logger.error(e.getMessage());
-        break;
+        throw new WriteProcessException(e.getMessage(), e.getErrorCode(), 
true);
       }
     } while (tsFileProcessor == null);
     return tsFileProcessor;
@@ -4490,8 +4580,13 @@ public class DataRegion implements IDataRegionForQuery {
             config.isEnableSeparateData()
                 && insertRowNode.getTime()
                     > lastFlushTimeMap.getFlushedTime(timePartitionId, 
insertRowNode.getDeviceID());
-        TsFileProcessor tsFileProcessor = 
getOrCreateTsFileProcessor(timePartitionId, isSequence);
-        if (tsFileProcessor == null) {
+        final TsFileProcessor tsFileProcessor;
+        try {
+          tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, 
isSequence);
+        } catch (WriteProcessException e) {
+          insertRowsOfOneDeviceNode
+              .getResults()
+              .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
           continue;
         }
         int finalI = i;
@@ -4499,18 +4594,9 @@ public class DataRegion implements IDataRegionForQuery {
             tsFileProcessor,
             (k, v) -> {
               if (v == null) {
-                v = new 
InsertRowsNode(insertRowsOfOneDeviceNode.getPlanNodeId());
-                v.setSearchIndex(insertRowNode.getSearchIndex());
-                v.setAligned(insertRowNode.isAligned());
-                if (insertRowNode.isGeneratedByPipe()) {
-                  v.markAsGeneratedByPipe();
-                }
-                if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
-                  v.markAsGeneratedByRemoteConsensusLeader();
-                }
+                v = createGroupedInsertRowsNode(insertRowsOfOneDeviceNode, 
insertRowNode);
               }
-              v.addOneInsertRowNode(insertRowNode, finalI);
-              v.updateProgressIndex(insertRowNode.getProgressIndex());
+              appendInsertRowNode(v, insertRowNode, finalI);
               return v;
             });
       }
@@ -4522,24 +4608,19 @@ public class DataRegion implements IDataRegionForQuery {
       // infoForMetrics[3]: ScheduleMemTableTimeCost
       // infoForMetrics[4]: InsertedPointsNumber
       for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
-        TsFileProcessor tsFileProcessor = entry.getKey();
         InsertRowsNode subInsertRowsNode = entry.getValue();
         try {
-          tsFileProcessor =
-              insertRowsWithTypeConsistencyCheck(
-                  tsFileProcessor, subInsertRowsNode, infoForMetrics);
+          List<TsFileProcessor> insertedProcessors =
+              insertRowsWithTypeConsistencyCheck(entry.getKey(), 
subInsertRowsNode, infoForMetrics);
+          
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
+          for (TsFileProcessor tsFileProcessor : insertedProcessors) {
+            // check memtable size and may asyncTryToFlush the work memtable
+            if (tsFileProcessor.shouldFlush()) {
+              fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+            }
+          }
         } catch (WriteProcessException e) {
-          insertRowsOfOneDeviceNode
-              .getResults()
-              .put(
-                  subInsertRowsNode.getInsertRowNodeIndexList().get(0),
-                  RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
-        }
-        
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
-
-        // check memtable size and may asyncTryToFlush the work memtable
-        if (tsFileProcessor.shouldFlush()) {
-          fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+          recordInsertRowsFailure(insertRowsOfOneDeviceNode, 
subInsertRowsNode, e);
         }
       }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
index 73fca57a1fd..45c6d6f86cf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
@@ -41,7 +41,7 @@ public class SubscriptionTaskSinkStage extends 
PipeTaskSinkStage {
 
   @Override
   protected void registerSubtask() {
-    this.connectorSubtaskId =
+    this.sinkSubtaskId =
         SubscriptionSinkSubtaskManager.instance()
             .register(
                 executor.get(),
@@ -56,22 +56,21 @@ public class SubscriptionTaskSinkStage extends 
PipeTaskSinkStage {
 
   @Override
   public void startSubtask() throws PipeException {
-    SubscriptionSinkSubtaskManager.instance().start(connectorSubtaskId);
+    SubscriptionSinkSubtaskManager.instance().start(sinkSubtaskId);
   }
 
   @Override
   public void stopSubtask() throws PipeException {
-    SubscriptionSinkSubtaskManager.instance().stop(connectorSubtaskId);
+    SubscriptionSinkSubtaskManager.instance().stop(sinkSubtaskId);
   }
 
   @Override
   public void dropSubtask() throws PipeException {
     SubscriptionSinkSubtaskManager.instance()
-        .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
+        .deregister(pipeName, creationTime, regionId, sinkSubtaskId);
   }
 
   public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
-    return SubscriptionSinkSubtaskManager.instance()
-        .getPipeConnectorPendingQueue(connectorSubtaskId);
+    return 
SubscriptionSinkSubtaskManager.instance().getPipeConnectorPendingQueue(sinkSubtaskId);
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 68d76764920..19ac712882b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion;
 
 import org.apache.iotdb.calc.exception.QueryProcessException;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -32,8 +33,10 @@ import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.NonAlignedFullPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
@@ -46,6 +49,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -68,6 +72,8 @@ import 
org.apache.iotdb.db.storageengine.rescon.memory.MemTableManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.constant.TestConstant;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.IDeviceID;
@@ -84,6 +90,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,6 +107,9 @@ import java.util.concurrent.Future;
 import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode;
 import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyList;
 
 public class DataRegionTest {
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
@@ -149,6 +159,7 @@ public class DataRegionTest {
       dataRegion.syncDeleteDataFiles();
       StorageEngine.getInstance().deleteDataRegion(new DataRegionId(0));
     }
+    TreeDeviceSchemaCacheManager.getInstance().cleanUp();
     EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
     CompactionTaskManager.getInstance().stop();
     EnvironmentUtils.cleanEnv();
@@ -1064,6 +1075,196 @@ public class DataRegionTest {
     dataRegion1.syncDeleteDataFiles();
   }
 
+  @Test
+  public void testInsertRowPropagatesTsFileProcessorCreationFailure()
+      throws IllegalPathException, DataRegionException, 
TsFileProcessorException {
+    final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, 
"root.fail_row");
+    dataRegion1.setTsFileProcessorSupplier(
+        (timePartitionId, sequence) -> {
+          throw new WriteProcessRejectException("mock creation failure");
+        });
+
+    final TSRecord record = new TSRecord("root.fail_row", 1);
+    record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(1)));
+    final InsertRowNode insertRowNode = buildInsertRowNodeByTSRecord(record);
+
+    try {
+      dataRegion1.insert(insertRowNode);
+      Assert.fail("Expected WriteProcessRejectException");
+    } catch (WriteProcessRejectException e) {
+      Assert.assertTrue(e.getMessage().contains("mock creation failure"));
+    } catch (WriteProcessException e) {
+      Assert.fail("Expected WriteProcessRejectException but got " + 
e.getClass().getSimpleName());
+    } finally {
+      dataRegion1.syncDeleteDataFiles();
+    }
+  }
+
+  @Test
+  public void testInsertRowsMarkAllFailedRowsForSameProcessor() throws 
Exception {
+    final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, 
"root.fail_rows");
+    final TsFileProcessor processor = Mockito.mock(TsFileProcessor.class);
+    Mockito.doThrow(new WriteProcessException("mock insert rows failure"))
+        .when(processor)
+        .insertRows(any(InsertRowsNode.class), any(long[].class));
+    Mockito.when(processor.shouldFlush()).thenReturn(false);
+    Mockito.when(processor.isSequence()).thenReturn(true);
+    dataRegion1.setTsFileProcessorSupplier((timePartitionId, sequence) -> 
processor);
+
+    final List<Integer> indexList = Arrays.asList(0, 1);
+    final List<InsertRowNode> nodes = new ArrayList<>();
+    for (long time : new long[] {1, 2}) {
+      final TSRecord record = new TSRecord("root.fail_rows", time);
+      record.addTuple(
+          DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(time)));
+      nodes.add(buildInsertRowNodeByTSRecord(record));
+    }
+    final InsertRowsNode insertRowsNode = new InsertRowsNode(new 
PlanNodeId(""), indexList, nodes);
+
+    try {
+      dataRegion1.insert(insertRowsNode);
+      Assert.fail("Expected BatchProcessException");
+    } catch (BatchProcessException e) {
+      Assert.assertEquals(2, insertRowsNode.getResults().size());
+      Assert.assertEquals(
+          TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode(),
+          insertRowsNode.getResults().get(0).getCode());
+      Assert.assertEquals(
+          TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode(),
+          insertRowsNode.getResults().get(1).getCode());
+    } finally {
+      dataRegion1.syncDeleteDataFiles();
+    }
+  }
+
+  @Test
+  public void testInsertRowsLastCacheSkipsFailedRows() throws Exception {
+    final boolean originalLastCacheEnable = COMMON_CONFIG.isLastCacheEnable();
+    COMMON_CONFIG.setLastCacheEnable(true);
+
+    final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, 
"root.cache_rows");
+    final TsFileProcessor successProcessor = 
Mockito.mock(TsFileProcessor.class);
+    Mockito.when(successProcessor.shouldFlush()).thenReturn(false);
+    Mockito.when(successProcessor.isSequence()).thenReturn(true);
+    final long failingTime = TimePartitionUtils.getTimePartitionInterval() + 1;
+    final long failingPartitionId = 
TimePartitionUtils.getTimePartitionId(failingTime);
+    dataRegion1.setTsFileProcessorSupplier(
+        (timePartitionId, sequence) -> {
+          if (timePartitionId == failingPartitionId) {
+            throw new WriteProcessException("mock row failure");
+          }
+          return successProcessor;
+        });
+
+    final MeasurementPath lastCachePath =
+        new MeasurementPath(
+            "root.cache_rows",
+            measurementId,
+            new MeasurementSchema(
+                measurementId, TSDataType.INT32, TSEncoding.PLAIN, 
CompressionType.UNCOMPRESSED));
+    TreeDeviceSchemaCacheManager.getInstance()
+        .declareLastCache(dataRegion1.getDatabaseName(), lastCachePath);
+
+    final List<Integer> indexList = Arrays.asList(0, 1);
+    final List<InsertRowNode> nodes = new ArrayList<>();
+    final long[] times = new long[] {1, failingTime};
+    final int[] values = new int[] {10, 20};
+    for (int i = 0; i < times.length; i++) {
+      final long time = times[i];
+      final TSRecord record = new TSRecord("root.cache_rows", time);
+      record.addTuple(
+          DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(values[i])));
+      nodes.add(buildInsertRowNodeByTSRecord(record));
+    }
+    final InsertRowsNode insertRowsNode = new InsertRowsNode(new 
PlanNodeId(""), indexList, nodes);
+
+    try {
+      dataRegion1.insert(insertRowsNode);
+      Assert.fail("Expected BatchProcessException");
+    } catch (BatchProcessException e) {
+      final TimeValuePair lastCache =
+          
TreeDeviceSchemaCacheManager.getInstance().getLastCache(lastCachePath);
+      Assert.assertNotNull(lastCache);
+      Assert.assertEquals(1, lastCache.getTimestamp());
+      Assert.assertEquals(10, lastCache.getValue().getInt());
+    } finally {
+      dataRegion1.syncDeleteDataFiles();
+      COMMON_CONFIG.setLastCacheEnable(originalLastCacheEnable);
+    }
+  }
+
+  @Test
+  public void testInsertTabletLastCacheSkipsFailedRows() throws Exception {
+    final boolean originalLastCacheEnable = COMMON_CONFIG.isLastCacheEnable();
+    COMMON_CONFIG.setLastCacheEnable(true);
+
+    final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, 
"root.cache_tablet");
+    final TsFileProcessor processor = Mockito.mock(TsFileProcessor.class);
+    Mockito.doAnswer(
+            invocation -> {
+              TSStatus[] results = invocation.getArgument(2);
+              results[0] = RpcUtils.SUCCESS_STATUS;
+              results[1] =
+                  RpcUtils.getStatus(
+                      TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), 
"mock row failure");
+              throw new WriteProcessException("mock tablet failure");
+            })
+        .when(processor)
+        .insertTablet(
+            any(InsertTabletNode.class),
+            anyList(),
+            any(TSStatus[].class),
+            anyBoolean(),
+            any(long[].class));
+    Mockito.when(processor.shouldFlush()).thenReturn(false);
+    Mockito.when(processor.isSequence()).thenReturn(true);
+    dataRegion1.setTsFileProcessorSupplier((timePartitionId, sequence) -> 
processor);
+
+    final MeasurementPath lastCachePath =
+        new MeasurementPath(
+            "root.cache_tablet",
+            measurementId,
+            new MeasurementSchema(
+                measurementId, TSDataType.INT32, TSEncoding.PLAIN, 
CompressionType.UNCOMPRESSED));
+    TreeDeviceSchemaCacheManager.getInstance()
+        .declareLastCache(dataRegion1.getDatabaseName(), lastCachePath);
+
+    final String[] measurements = new String[] {measurementId};
+    final TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32};
+    final MeasurementSchema[] measurementSchemas =
+        new MeasurementSchema[] {
+          new MeasurementSchema(measurementId, TSDataType.INT32, 
TSEncoding.PLAIN)
+        };
+    final long[] times = new long[] {1, 2};
+    final Object[] columns = new Object[] {new int[] {10, 20}};
+    final InsertTabletNode insertTabletNode =
+        new InsertTabletNode(
+            new QueryId("test_write").genPlanNodeId(),
+            new PartialPath("root.cache_tablet"),
+            false,
+            measurements,
+            dataTypes,
+            measurementSchemas,
+            times,
+            null,
+            columns,
+            times.length);
+
+    try {
+      dataRegion1.insertTablet(insertTabletNode);
+      Assert.fail("Expected BatchProcessException");
+    } catch (BatchProcessException e) {
+      final TimeValuePair lastCache =
+          
TreeDeviceSchemaCacheManager.getInstance().getLastCache(lastCachePath);
+      Assert.assertNotNull(lastCache);
+      Assert.assertEquals(1, lastCache.getTimestamp());
+      Assert.assertEquals(10, lastCache.getValue().getInt());
+    } finally {
+      dataRegion1.syncDeleteDataFiles();
+      COMMON_CONFIG.setLastCacheEnable(originalLastCacheEnable);
+    }
+  }
+
   @Test
   public void testSmallReportProportionInsertRow()
       throws WriteProcessException,
@@ -1667,6 +1868,32 @@ public class DataRegionTest {
     }
   }
 
+  private interface TsFileProcessorSupplier {
+    TsFileProcessor get(long timePartitionId, boolean sequence) throws 
WriteProcessException;
+  }
+
+  private static class HookedDataRegion extends DummyDataRegion {
+    private TsFileProcessorSupplier tsFileProcessorSupplier;
+
+    private HookedDataRegion(String systemInfoDir, String storageGroupName)
+        throws DataRegionException {
+      super(systemInfoDir, storageGroupName);
+    }
+
+    private void setTsFileProcessorSupplier(TsFileProcessorSupplier 
tsFileProcessorSupplier) {
+      this.tsFileProcessorSupplier = tsFileProcessorSupplier;
+    }
+
+    @Override
+    protected TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, 
boolean sequence)
+        throws WriteProcessException {
+      if (tsFileProcessorSupplier != null) {
+        return tsFileProcessorSupplier.get(timeRangeId, sequence);
+      }
+      return super.getOrCreateTsFileProcessor(timeRangeId, sequence);
+    }
+  }
+
   // -- test for deleting data directly
   // -- delete data and file only when:
   // 1. tsfile is closed
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index a5a6dd3808e..ea20f9c76cc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -285,7 +285,7 @@ public class ClientPoolFactory {
                       
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
                       
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
                       .build(),
-                  ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
+                  ThreadName.PIPE_ASYNC_SINK_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
                   
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxClientNumber())
                   .build()
@@ -311,7 +311,7 @@ public class ClientPoolFactory {
                       
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
                       
.setPrintLogWhenEncounterException(conf.isPrintLogWhenEncounterException())
                       .build(),
-                  ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
+                  ThreadName.PIPE_ASYNC_SINK_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
                   
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxTsFileClientNumber())
                   .build()
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 81f2aa7156c..20fa9d78d59 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -136,9 +136,9 @@ public enum ThreadName {
   GPRC_DEFAULT_WORKER_ELG("grpc-default-worker-ELG"),
   GROUP_MANAGEMENT("groupManagement"),
   // -------------------------- Compute --------------------------
-  PIPE_EXTRACTOR_DISRUPTOR("Pipe-Extractor-Disruptor"),
+  PIPE_SOURCE_DISRUPTOR("Pipe-Source-Disruptor"),
   PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
-  PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
+  PIPE_SINK_EXECUTOR_POOL("Pipe-Sink-Executor-Pool"),
   IOT_CONSENSUS_V2_EXECUTOR_POOL("Pipe-Consensus-Executor-Pool"),
   PIPE_CONFIGNODE_EXECUTOR_POOL("Pipe-ConfigNode-Executor-Pool"),
   PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
@@ -149,7 +149,7 @@ public enum ThreadName {
   PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR("Pipe-Runtime-Periodical-Job-Executor"),
   PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER(
       "Pipe-Runtime-Periodical-Phantom-Reference-Cleaner"),
-  PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
+  PIPE_ASYNC_SINK_CLIENT_POOL("Pipe-Async-Sink-Client-Pool"),
   PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
   PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"),
   PIPE_PARALLEL_EXECUTION_POOL("Pipe-Parallel-Execution-Pool"),
@@ -302,9 +302,9 @@ public enum ThreadName {
   private static final Set<ThreadName> computeThreadNames =
       new HashSet<>(
           Arrays.asList(
-              PIPE_EXTRACTOR_DISRUPTOR,
+              PIPE_SOURCE_DISRUPTOR,
               PIPE_PROCESSOR_EXECUTOR_POOL,
-              PIPE_CONNECTOR_EXECUTOR_POOL,
+              PIPE_SINK_EXECUTOR_POOL,
               IOT_CONSENSUS_V2_EXECUTOR_POOL,
               PIPE_CONFIGNODE_EXECUTOR_POOL,
               PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL,
@@ -313,7 +313,7 @@ public enum ThreadName {
               PIPE_RUNTIME_PROCEDURE_SUBMITTER,
               PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR,
               PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER,
-              PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
+              PIPE_ASYNC_SINK_CLIENT_POOL,
               PIPE_RECEIVER_AIR_GAP_AGENT,
               PIPE_AIR_GAP_RECEIVER,
               PIPE_PARALLEL_EXECUTION_POOL,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index df100b3874d..cbed4db547e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -281,11 +281,11 @@ public class CommonConfig {
   private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
   private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30;
   private long pipeAsyncSinkMaxRetryExecutionTimeMsPerCall = 500;
-  private int pipeAsyncConnectorSelectorNumber =
+  private int pipeAsyncSinkSelectorNumber =
       Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
-  private int pipeAsyncConnectorMaxClientNumber =
+  private int pipeAsyncSinkMaxClientNumber =
       Math.max(32, Runtime.getRuntime().availableProcessors() * 2);
-  private int pipeAsyncConnectorMaxTsFileClientNumber =
+  private int pipeAsyncSinkMaxTsFileClientNumber =
       Math.max(16, Runtime.getRuntime().availableProcessors());
   private boolean printLogWhenEncounterException = false;
 
@@ -293,8 +293,7 @@ public class CommonConfig {
   private double pipeAllSinksRateLimitBytesPerSecond = -1;
   private int rateLimiterHotReloadCheckIntervalMs = 1000;
 
-  private int pipeConnectorRequestSliceThresholdBytes =
-      (int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
+  private int pipeSinkRequestSliceThresholdBytes = (int) 
(RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
 
   private boolean isSeperatedPipeHeartbeatEnabled = true;
   private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 3;
@@ -1078,7 +1077,7 @@ public class CommonConfig {
   }
 
   public void setPipeSinkHandshakeTimeoutMs(long pipeSinkHandshakeTimeoutMs) {
-    final int fPipeConnectorHandshakeTimeoutMs = 
this.pipeSinkHandshakeTimeoutMs;
+    final int fPipeSinkHandshakeTimeoutMs = this.pipeSinkHandshakeTimeoutMs;
     try {
       this.pipeSinkHandshakeTimeoutMs = 
Math.toIntExact(pipeSinkHandshakeTimeoutMs);
     } catch (ArithmeticException e) {
@@ -1086,7 +1085,7 @@ public class CommonConfig {
       logger.warn(
           "Given pipe connector handshake timeout is too large, set to {} 
ms.", Integer.MAX_VALUE);
     } finally {
-      if (fPipeConnectorHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs) 
{
+      if (fPipeSinkHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs) {
         logger.info(
             "pipeConnectorHandshakeTimeoutMs is set to {}.", 
this.pipeSinkHandshakeTimeoutMs);
       }
@@ -1118,16 +1117,16 @@ public class CommonConfig {
   }
 
   public void setPipeSinkTransferTimeoutMs(long pipeSinkTransferTimeoutMs) {
-    final int fPipeConnectorTransferTimeoutMs = this.pipeSinkTransferTimeoutMs;
+    final int fPipeSinkTransferTimeoutMs = this.pipeSinkTransferTimeoutMs;
     try {
       this.pipeSinkTransferTimeoutMs = 
Math.toIntExact(pipeSinkTransferTimeoutMs);
     } catch (ArithmeticException e) {
       this.pipeSinkTransferTimeoutMs = Integer.MAX_VALUE;
       logger.warn(
-          "Given pipe connector transfer timeout is too large, set to {} ms.", 
Integer.MAX_VALUE);
+          "Given pipe sink transfer timeout is too large, set to {} ms.", 
Integer.MAX_VALUE);
     } finally {
-      if (fPipeConnectorTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) {
-        logger.info("pipeConnectorTransferTimeoutMs is set to {}.", 
pipeSinkTransferTimeoutMs);
+      if (fPipeSinkTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) {
+        logger.info("pipeSinkTransferTimeoutMs is set to {}.", 
pipeSinkTransferTimeoutMs);
       }
     }
   }
@@ -1141,7 +1140,7 @@ public class CommonConfig {
       return;
     }
     this.pipeSinkReadFileBufferSize = pipeSinkReadFileBufferSize;
-    logger.info("pipeConnectorReadFileBufferSize is set to {}.", 
pipeSinkReadFileBufferSize);
+    logger.info("pipeSinkReadFileBufferSize is set to {}.", 
pipeSinkReadFileBufferSize);
   }
 
   public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
@@ -1241,60 +1240,58 @@ public class CommonConfig {
   }
 
   public int getPipeAsyncSinkSelectorNumber() {
-    return pipeAsyncConnectorSelectorNumber;
+    return pipeAsyncSinkSelectorNumber;
   }
 
-  public void setPipeAsyncConnectorSelectorNumber(int 
pipeAsyncConnectorSelectorNumber) {
-    if (pipeAsyncConnectorSelectorNumber <= 0) {
+  public void setPipeAsyncSinkSelectorNumber(int pipeAsyncSinkSelectorNumber) {
+    if (pipeAsyncSinkSelectorNumber <= 0) {
       logger.info(
-          "pipeAsyncConnectorSelectorNumber should be greater than 0, 
configuring it not to change.");
+          "pipeAsyncSinkSelectorNumber should be greater than 0, configuring 
it not to change.");
       return;
     }
-    pipeAsyncConnectorSelectorNumber = Math.max(4, 
pipeAsyncConnectorSelectorNumber);
-    if (this.pipeAsyncConnectorSelectorNumber == 
pipeAsyncConnectorSelectorNumber) {
+    pipeAsyncSinkSelectorNumber = Math.max(4, pipeAsyncSinkSelectorNumber);
+    if (this.pipeAsyncSinkSelectorNumber == pipeAsyncSinkSelectorNumber) {
       return;
     }
-    this.pipeAsyncConnectorSelectorNumber = pipeAsyncConnectorSelectorNumber;
-    logger.info("pipeAsyncConnectorSelectorNumber is set to {}.", 
pipeAsyncConnectorSelectorNumber);
+    this.pipeAsyncSinkSelectorNumber = pipeAsyncSinkSelectorNumber;
+    logger.info("pipeAsyncSinkSelectorNumber is set to {}.", 
pipeAsyncSinkSelectorNumber);
   }
 
   public int getPipeAsyncSinkMaxClientNumber() {
-    return pipeAsyncConnectorMaxClientNumber;
+    return pipeAsyncSinkMaxClientNumber;
   }
 
-  public void setPipeAsyncConnectorMaxClientNumber(int 
pipeAsyncConnectorMaxClientNumber) {
-    if (pipeAsyncConnectorMaxClientNumber <= 0) {
+  public void setPipeAsyncSinkMaxClientNumber(int 
pipeAsyncSinkMaxClientNumber) {
+    if (pipeAsyncSinkMaxClientNumber <= 0) {
       logger.info(
-          " pipeAsyncConnectorMaxClientNumber should be greater than 0, 
configuring it not to change.");
+          " pipeAsyncSinkMaxClientNumber should be greater than 0, configuring 
it not to change.");
       return;
     }
-    pipeAsyncConnectorMaxClientNumber = Math.max(32, 
pipeAsyncConnectorMaxClientNumber);
-    if (this.pipeAsyncConnectorMaxClientNumber == 
pipeAsyncConnectorMaxClientNumber) {
+    pipeAsyncSinkMaxClientNumber = Math.max(32, pipeAsyncSinkMaxClientNumber);
+    if (this.pipeAsyncSinkMaxClientNumber == pipeAsyncSinkMaxClientNumber) {
       return;
     }
-    this.pipeAsyncConnectorMaxClientNumber = pipeAsyncConnectorMaxClientNumber;
-    logger.info(
-        "pipeAsyncConnectorMaxClientNumber is set to {}.", 
pipeAsyncConnectorMaxClientNumber);
+    this.pipeAsyncSinkMaxClientNumber = pipeAsyncSinkMaxClientNumber;
+    logger.info("pipeAsyncSinkMaxClientNumber is set to {}.", 
pipeAsyncSinkMaxClientNumber);
   }
 
   public int getPipeAsyncSinkMaxTsFileClientNumber() {
-    return pipeAsyncConnectorMaxTsFileClientNumber;
+    return pipeAsyncSinkMaxTsFileClientNumber;
   }
 
-  public void setPipeAsyncConnectorMaxTsFileClientNumber(
-      int pipeAsyncConnectorMaxTsFileClientNumber) {
-    if (pipeAsyncConnectorMaxTsFileClientNumber <= 0) {
+  public void setPipeAsyncSinkMaxTsFileClientNumber(int 
pipeAsyncSinkMaxTsFileClientNumber) {
+    if (pipeAsyncSinkMaxTsFileClientNumber <= 0) {
       logger.info(
-          "pipeAsyncConnectorMaxTsFileClientNumber should be greater than 0, 
configuring it not to change.");
+          "pipeAsyncSinkMaxTsFileClientNumber should be greater than 0, 
configuring it not to change.");
       return;
     }
-    pipeAsyncConnectorMaxTsFileClientNumber = Math.max(16, 
pipeAsyncConnectorMaxTsFileClientNumber);
-    if (this.pipeAsyncConnectorMaxTsFileClientNumber == 
pipeAsyncConnectorMaxTsFileClientNumber) {
+    pipeAsyncSinkMaxTsFileClientNumber = Math.max(16, 
pipeAsyncSinkMaxTsFileClientNumber);
+    if (this.pipeAsyncSinkMaxTsFileClientNumber == 
pipeAsyncSinkMaxTsFileClientNumber) {
       return;
     }
-    this.pipeAsyncConnectorMaxTsFileClientNumber = 
pipeAsyncConnectorMaxTsFileClientNumber;
+    this.pipeAsyncSinkMaxTsFileClientNumber = 
pipeAsyncSinkMaxTsFileClientNumber;
     logger.info(
-        "pipeAsyncConnectorMaxClientNumber is set to {}.", 
pipeAsyncConnectorMaxTsFileClientNumber);
+        "pipeAsyncSinkMaxTsFileClientNumber is set to {}.", 
pipeAsyncSinkMaxTsFileClientNumber);
   }
 
   public boolean isPrintLogWhenEncounterException() {
@@ -1398,12 +1395,12 @@ public class CommonConfig {
     return pipeSinkRetryIntervalMs;
   }
 
-  public void setPipeSinkRetryIntervalMs(long pipeConnectorRetryIntervalMs) {
-    if (this.pipeSinkRetryIntervalMs == pipeConnectorRetryIntervalMs) {
+  public void setPipeSinkRetryIntervalMs(long pipeSinkRetryIntervalMs) {
+    if (this.pipeSinkRetryIntervalMs == pipeSinkRetryIntervalMs) {
       return;
     }
-    this.pipeSinkRetryIntervalMs = pipeConnectorRetryIntervalMs;
-    logger.info("pipeSinkRetryIntervalMs is set to {}", 
pipeConnectorRetryIntervalMs);
+    this.pipeSinkRetryIntervalMs = pipeSinkRetryIntervalMs;
+    logger.info("pipeSinkRetryIntervalMs is set to {}", 
pipeSinkRetryIntervalMs);
   }
 
   public boolean isPipeSinkRetryLocallyForConnectionError() {
@@ -2204,18 +2201,16 @@ public class CommonConfig {
   }
 
   public int getPipeSinkRequestSliceThresholdBytes() {
-    return pipeConnectorRequestSliceThresholdBytes;
+    return pipeSinkRequestSliceThresholdBytes;
   }
 
-  public void setPipeConnectorRequestSliceThresholdBytes(
-      int pipeConnectorRequestSliceThresholdBytes) {
-    if (this.pipeConnectorRequestSliceThresholdBytes == 
pipeConnectorRequestSliceThresholdBytes) {
+  public void setPipeSinkRequestSliceThresholdBytes(int 
pipeSinkRequestSliceThresholdBytes) {
+    if (this.pipeSinkRequestSliceThresholdBytes == 
pipeSinkRequestSliceThresholdBytes) {
       return;
     }
-    this.pipeConnectorRequestSliceThresholdBytes = 
pipeConnectorRequestSliceThresholdBytes;
+    this.pipeSinkRequestSliceThresholdBytes = 
pipeSinkRequestSliceThresholdBytes;
     logger.info(
-        "pipeConnectorRequestSliceThresholdBytes is set to {}",
-        pipeConnectorRequestSliceThresholdBytes);
+        "pipeConnectorRequestSliceThresholdBytes is set to {}", 
pipeSinkRequestSliceThresholdBytes);
   }
 
   public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 8dcbce35094..d0c37a50367 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -443,7 +443,7 @@ public class PipeDescriptor {
                 "rate_limiter_hot_reload_check_interval_ms",
                 
String.valueOf(config.getRateLimiterHotReloadCheckIntervalMs()))));
 
-    config.setPipeConnectorRequestSliceThresholdBytes(
+    config.setPipeSinkRequestSliceThresholdBytes(
         Integer.parseInt(
             properties.getProperty(
                 "pipe_connector_request_slice_threshold_bytes",
@@ -612,7 +612,7 @@ public class PipeDescriptor {
             "pipe_async_connector_selector_number",
             isHotModify);
     if (value != null) {
-      config.setPipeAsyncConnectorSelectorNumber(Integer.parseInt(value));
+      config.setPipeAsyncSinkSelectorNumber(Integer.parseInt(value));
     }
 
     value =
@@ -622,7 +622,7 @@ public class PipeDescriptor {
             "pipe_async_connector_max_client_number",
             isHotModify);
     if (value != null) {
-      config.setPipeAsyncConnectorMaxClientNumber(Integer.parseInt(value));
+      config.setPipeAsyncSinkMaxClientNumber(Integer.parseInt(value));
     }
 
     value =
@@ -632,7 +632,7 @@ public class PipeDescriptor {
             "pipe_async_connector_max_tsfile_client_number",
             isHotModify);
     if (value != null) {
-      
config.setPipeAsyncConnectorMaxTsFileClientNumber(Integer.parseInt(value));
+      config.setPipeAsyncSinkMaxTsFileClientNumber(Integer.parseInt(value));
     }
 
     value =

Reply via email to