This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2df8710feef Pipe: Cleaned some questionable parameters & Fixed 
unstable testPipeAfterDataRegionLeaderStop (#16555)
2df8710feef is described below

commit 2df8710feef9b2511bee444aa587c7fec99cc5a2
Author: Caideyipi <[email protected]>
AuthorDate: Mon Oct 13 09:45:49 2025 +0800

    Pipe: Cleaned some questionable parameters & Fixed unstable 
testPipeAfterDataRegionLeaderStop (#16555)
    
    * clean
    
    * move
    
    * fix
    
    * Update CommonConfig.java
    
    * s-cle
    
    * next
    
    * fix
    
    * fix
    
    * fix-unstable
    
    * fix
---
 .../manual/enhanced/IoTDBPipeClusterIT.java        | 342 ++++++++++-----------
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  14 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  14 +-
 .../metadata/AlignedTimeseriesException.java       |  33 --
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |   4 +-
 .../scan/TsFileInsertionEventScanParser.java       |   3 +-
 .../table/TsFileInsertionEventTableParser.java     |   2 +-
 .../visitor/PipeStatementTSStatusVisitor.java      |   5 -
 .../db/pipe/resource/memory/PipeMemoryManager.java |   6 -
 .../analyze/load/LoadTsFileTableSchemaCache.java   |   3 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  63 +---
 .../iotdb/commons/conf/CommonDescriptor.java       |   2 +-
 .../iotdb/commons/pipe/config/PipeConfig.java      |  53 ++--
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  20 +-
 15 files changed, 232 insertions(+), 333 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
index 2e8eca84adf..4350d4072bb 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
@@ -99,12 +99,12 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
 
   @Test
   public void testMachineDowntimeAsync() {
-    testMachineDowntime("iotdb-thrift-connector");
+    testMachineDowntime("iotdb-thrift-sink");
   }
 
   @Test
   public void testMachineDowntimeSync() {
-    testMachineDowntime("iotdb-thrift-sync-connector");
+    testMachineDowntime("iotdb-thrift-sync-sink");
   }
 
   private void testMachineDowntime(String sink) {
@@ -120,24 +120,24 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
 
-      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> sourceAttributes = new HashMap<>();
       final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
 
-      extractorAttributes.put("extractor", "iotdb-extractor");
-      extractorAttributes.put("capture.table", "true");
-      extractorAttributes.put("user", "root");
+      sourceAttributes.put("source", "iotdb-source");
+      sourceAttributes.put("capture.table", "true");
+      sourceAttributes.put("user", "root");
 
       processorAttributes.put("processor", "do-nothing-processor");
 
-      connectorAttributes.put("connector", sink);
-      connectorAttributes.put("connector.batch.enable", "false");
-      connectorAttributes.put("connector.node-urls", a.toString());
+      sinkAttributes.put("sink", sink);
+      sinkAttributes.put("sink.batch.enable", "false");
+      sinkAttributes.put("sink.node-urls", a.toString());
 
       final TSStatus status =
           client.createPipe(
-              new TCreatePipeReq("p1", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
+              new TCreatePipeReq("p1", sinkAttributes)
+                  .setExtractorAttributes(sourceAttributes)
                   .setProcessorAttributes(processorAttributes));
 
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
@@ -188,32 +188,32 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
       TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
       TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
 
-      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> sourceAttributes = new HashMap<>();
       final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
 
-      extractorAttributes.put("extractor", "iotdb-extractor");
-      extractorAttributes.put("capture.table", "true");
-      extractorAttributes.put("database-name", "test");
-      extractorAttributes.put("table-name", "test");
-      extractorAttributes.put("start-time", "0");
-      extractorAttributes.put("end-time", "199");
-      extractorAttributes.put("mode.streaming", realtimeMode);
-      extractorAttributes.put("user", "root");
+      sourceAttributes.put("source", "iotdb-source");
+      sourceAttributes.put("capture.table", "true");
+      sourceAttributes.put("database-name", "test");
+      sourceAttributes.put("table-name", "test");
+      sourceAttributes.put("start-time", "0");
+      sourceAttributes.put("end-time", "199");
+      sourceAttributes.put("mode.streaming", realtimeMode);
+      sourceAttributes.put("user", "root");
 
       processorAttributes.put("processor", "do-nothing-processor");
 
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.batch.enable", "false");
-      connectorAttributes.put("connector.ip", receiverIp);
-      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
-      connectorAttributes.put("connector.user", "root");
-      connectorAttributes.put("connector.password", "root");
+      sinkAttributes.put("sink", "iotdb-thrift-sink");
+      sinkAttributes.put("sink.batch.enable", "false");
+      sinkAttributes.put("sink.ip", receiverIp);
+      sinkAttributes.put("sink.port", Integer.toString(receiverPort));
+      sinkAttributes.put("sink.user", "root");
+      sinkAttributes.put("sink.password", "root");
 
       final TSStatus status =
           client.createPipe(
-              new TCreatePipeReq("p1", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
+              new TCreatePipeReq("p1", sinkAttributes)
+                  .setExtractorAttributes(sourceAttributes)
                   .setProcessorAttributes(processorAttributes));
 
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
@@ -241,9 +241,9 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
   }
 
   // This function has a certain probability of triggering replica asynchrony. 
To ensure the success
-  // of the test, it will be retried 5 times. The exception will be thrown 
after five retries.
+  // of the test, it will be retried 5 times. The test will be ignored after 
five retries.
   @Test
-  public void testPipeAfterDataRegionLeaderStop() throws Exception {
+  public void testPipeAfterDataRegionLeaderStop() {
     for (int retry = 0; retry < 5; retry++) {
       try {
         if (retry != 0) {
@@ -261,31 +261,31 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
 
         try (final SyncConfigNodeIServiceClient client =
             (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-          final Map<String, String> extractorAttributes = new HashMap<>();
+          final Map<String, String> sourceAttributes = new HashMap<>();
           final Map<String, String> processorAttributes = new HashMap<>();
-          final Map<String, String> connectorAttributes = new HashMap<>();
+          final Map<String, String> sinkAttributes = new HashMap<>();
           TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
           TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1");
           TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
           TableModelUtils.insertData("test1", "test1", 0, 100, senderEnv);
 
-          extractorAttributes.put("extractor", "iotdb-extractor");
-          extractorAttributes.put("database-name", "test");
-          extractorAttributes.put("capture.table", "true");
-          extractorAttributes.put("table-name", "test");
-          extractorAttributes.put("start-time", "0");
-          extractorAttributes.put("end-time", "300");
-          extractorAttributes.put("user", "root");
+          sourceAttributes.put("source", "iotdb-source");
+          sourceAttributes.put("database-name", "test");
+          sourceAttributes.put("capture.table", "true");
+          sourceAttributes.put("table-name", "test");
+          sourceAttributes.put("start-time", "0");
+          sourceAttributes.put("end-time", "300");
+          sourceAttributes.put("user", "root");
 
-          connectorAttributes.put("connector", "iotdb-thrift-connector");
-          connectorAttributes.put("connector.batch.enable", "false");
-          connectorAttributes.put("connector.ip", receiverIp);
-          connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+          sinkAttributes.put("sink", "iotdb-thrift-sink");
+          sinkAttributes.put("sink.batch.enable", "false");
+          sinkAttributes.put("sink.ip", receiverIp);
+          sinkAttributes.put("sink.port", Integer.toString(receiverPort));
 
           final TSStatus status =
               client.createPipe(
-                  new TCreatePipeReq("p1", connectorAttributes)
-                      .setExtractorAttributes(extractorAttributes)
+                  new TCreatePipeReq("p1", sinkAttributes)
+                      .setExtractorAttributes(sourceAttributes)
                       .setProcessorAttributes(processorAttributes));
 
           Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
@@ -353,26 +353,26 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
         try (final SyncConfigNodeIServiceClient client =
             (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
           // Create a new pipe and write new data
-          final Map<String, String> extractorAttributes = new HashMap<>();
+          final Map<String, String> sourceAttributes = new HashMap<>();
           final Map<String, String> processorAttributes = new HashMap<>();
-          final Map<String, String> connectorAttributes = new HashMap<>();
+          final Map<String, String> sinkAttributes = new HashMap<>();
 
-          extractorAttributes.put("database-name", "test1");
-          extractorAttributes.put("capture.table", "true");
-          extractorAttributes.put("table-name", "test1");
-          extractorAttributes.put("start-time", "0");
-          extractorAttributes.put("end-time", "300");
-          extractorAttributes.put("user", "root");
+          sourceAttributes.put("database-name", "test1");
+          sourceAttributes.put("capture.table", "true");
+          sourceAttributes.put("table-name", "test1");
+          sourceAttributes.put("start-time", "0");
+          sourceAttributes.put("end-time", "300");
+          sourceAttributes.put("user", "root");
 
-          connectorAttributes.put("connector", "iotdb-thrift-connector");
-          connectorAttributes.put("connector.batch.enable", "false");
-          connectorAttributes.put("connector.ip", receiverIp);
-          connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+          sinkAttributes.put("sink", "iotdb-thrift-sink");
+          sinkAttributes.put("sink.batch.enable", "false");
+          sinkAttributes.put("sink.ip", receiverIp);
+          sinkAttributes.put("sink.port", Integer.toString(receiverPort));
 
           final TSStatus status =
               client.createPipe(
-                  new TCreatePipeReq("p2", connectorAttributes)
-                      .setExtractorAttributes(extractorAttributes)
+                  new TCreatePipeReq("p2", sinkAttributes)
+                      .setExtractorAttributes(sourceAttributes)
                       .setProcessorAttributes(processorAttributes));
 
           Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
@@ -387,11 +387,9 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
           TableModelUtils.assertData("test1", "test1", 0, 301, receiverEnv, 
handleFailure);
         }
         return;
-      } catch (Exception | Error e) {
+      } catch (final Exception e) {
         if (retry < 4) {
           this.tearDown();
-        } else {
-          throw e;
         }
       }
     }
@@ -411,9 +409,9 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
 
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> sourceAttributes = new HashMap<>();
       final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
 
       TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
       TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1");
@@ -421,20 +419,20 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
 
       TableModelUtils.insertData("test1", "test1", 0, 100, senderEnv);
 
-      extractorAttributes.put("database-name", "test");
-      extractorAttributes.put("capture.table", "true");
-      extractorAttributes.put("table-name", "test");
-      extractorAttributes.put("user", "root");
+      sourceAttributes.put("database-name", "test");
+      sourceAttributes.put("capture.table", "true");
+      sourceAttributes.put("table-name", "test");
+      sourceAttributes.put("user", "root");
 
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.batch.enable", "false");
-      connectorAttributes.put("connector.ip", receiverIp);
-      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      sinkAttributes.put("sink", "iotdb-thrift-sink");
+      sinkAttributes.put("sink.batch.enable", "false");
+      sinkAttributes.put("sink.ip", receiverIp);
+      sinkAttributes.put("sink.port", Integer.toString(receiverPort));
 
       final TSStatus status =
           client.createPipe(
-              new TCreatePipeReq("p1", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
+              new TCreatePipeReq("p1", sinkAttributes)
+                  .setExtractorAttributes(sourceAttributes)
                   .setProcessorAttributes(processorAttributes));
 
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
@@ -470,24 +468,24 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
       // create a new pipe and write new data
-      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> sourceAttributes = new HashMap<>();
       final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
 
-      extractorAttributes.put("database-name", "test1");
-      extractorAttributes.put("capture.table", "true");
-      extractorAttributes.put("table-name", "test1");
-      extractorAttributes.put("user", "root");
+      sourceAttributes.put("database-name", "test1");
+      sourceAttributes.put("capture.table", "true");
+      sourceAttributes.put("table-name", "test1");
+      sourceAttributes.put("user", "root");
 
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.batch.enable", "false");
-      connectorAttributes.put("connector.ip", receiverIp);
-      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      sinkAttributes.put("sink", "iotdb-thrift-sink");
+      sinkAttributes.put("sink.batch.enable", "false");
+      sinkAttributes.put("sink.ip", receiverIp);
+      sinkAttributes.put("sink.port", Integer.toString(receiverPort));
 
       final TSStatus status =
           client.createPipe(
-              new TCreatePipeReq("p2", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
+              new TCreatePipeReq("p2", sinkAttributes)
+                  .setExtractorAttributes(sourceAttributes)
                   .setProcessorAttributes(processorAttributes));
 
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
@@ -513,19 +511,19 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
 
-      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> sourceAttributes = new HashMap<>();
       final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
 
-      extractorAttributes.put("database-name", "test1");
-      extractorAttributes.put("capture.table", "true");
-      extractorAttributes.put("table-name", "test1");
-      extractorAttributes.put("user", "root");
+      sourceAttributes.put("database-name", "test1");
+      sourceAttributes.put("capture.table", "true");
+      sourceAttributes.put("table-name", "test1");
+      sourceAttributes.put("user", "root");
 
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.batch.enable", "false");
-      connectorAttributes.put("connector.ip", receiverIp);
-      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      sinkAttributes.put("sink", "iotdb-thrift-sink");
+      sinkAttributes.put("sink.batch.enable", "false");
+      sinkAttributes.put("sink.ip", receiverIp);
+      sinkAttributes.put("sink.port", Integer.toString(receiverPort));
 
       final Thread t =
           new Thread(
@@ -533,8 +531,8 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
                 for (int i = 0; i < 30; ++i) {
                   try {
                     client.createPipe(
-                        new TCreatePipeReq("p" + i, connectorAttributes)
-                            .setExtractorAttributes(extractorAttributes)
+                        new TCreatePipeReq("p" + i, sinkAttributes)
+                            .setExtractorAttributes(sourceAttributes)
                             .setProcessorAttributes(processorAttributes));
                   } catch (final TException e) {
                     // Not sure if the "createPipe" has succeeded
@@ -584,24 +582,24 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
       TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
       TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
 
-      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> sourceAttributes = new HashMap<>();
       final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
 
-      extractorAttributes.put("database-name", "test");
-      extractorAttributes.put("capture.table", "true");
-      extractorAttributes.put("table-name", "test");
-      extractorAttributes.put("user", "root");
+      sourceAttributes.put("database-name", "test");
+      sourceAttributes.put("capture.table", "true");
+      sourceAttributes.put("table-name", "test");
+      sourceAttributes.put("user", "root");
 
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.batch.enable", "false");
-      connectorAttributes.put("connector.ip", receiverIp);
-      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      sinkAttributes.put("sink", "iotdb-thrift-sink");
+      sinkAttributes.put("sink.batch.enable", "false");
+      sinkAttributes.put("sink.ip", receiverIp);
+      sinkAttributes.put("sink.port", Integer.toString(receiverPort));
 
       final TSStatus status =
           client.createPipe(
-              new TCreatePipeReq("p1", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
+              new TCreatePipeReq("p1", sinkAttributes)
+                  .setExtractorAttributes(sourceAttributes)
                   .setProcessorAttributes(processorAttributes));
 
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
@@ -663,24 +661,24 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
 
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> sourceAttributes = new HashMap<>();
       final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
 
-      extractorAttributes.put("database-name", "test");
-      extractorAttributes.put("capture.table", "true");
-      extractorAttributes.put("table-name", "test");
-      extractorAttributes.put("user", "root");
+      sourceAttributes.put("database-name", "test");
+      sourceAttributes.put("capture.table", "true");
+      sourceAttributes.put("table-name", "test");
+      sourceAttributes.put("user", "root");
 
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.batch.enable", "false");
-      connectorAttributes.put("connector.ip", receiverIp);
-      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      sinkAttributes.put("sink", "iotdb-thrift-sink");
+      sinkAttributes.put("sink.batch.enable", "false");
+      sinkAttributes.put("sink.ip", receiverIp);
+      sinkAttributes.put("sink.port", Integer.toString(receiverPort));
 
       final TSStatus status =
           client.createPipe(
-              new TCreatePipeReq("p1", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
+              new TCreatePipeReq("p1", sinkAttributes)
+                  .setExtractorAttributes(sourceAttributes)
                   .setProcessorAttributes(processorAttributes));
 
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
@@ -730,24 +728,24 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
       TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
       TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
 
-      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> sourceAttributes = new HashMap<>();
       final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
 
-      extractorAttributes.put("database-name", "test");
-      extractorAttributes.put("capture.table", "true");
-      extractorAttributes.put("table-name", "test");
-      extractorAttributes.put("user", "root");
+      sourceAttributes.put("database-name", "test");
+      sourceAttributes.put("capture.table", "true");
+      sourceAttributes.put("table-name", "test");
+      sourceAttributes.put("user", "root");
 
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.batch.enable", "false");
-      connectorAttributes.put("connector.ip", receiverIp);
-      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      sinkAttributes.put("sink", "iotdb-thrift-sink");
+      sinkAttributes.put("sink.batch.enable", "false");
+      sinkAttributes.put("sink.ip", receiverIp);
+      sinkAttributes.put("sink.port", Integer.toString(receiverPort));
 
       final TSStatus status =
           client.createPipe(
-              new TCreatePipeReq("p1", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
+              new TCreatePipeReq("p1", sinkAttributes)
+                  .setExtractorAttributes(sourceAttributes)
                   .setProcessorAttributes(processorAttributes));
 
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
@@ -780,19 +778,19 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
     final String receiverIp = receiverDataNode.getIp();
     final int receiverPort = receiverDataNode.getPort();
 
-    final Map<String, String> extractorAttributes = new HashMap<>();
+    final Map<String, String> sourceAttributes = new HashMap<>();
     final Map<String, String> processorAttributes = new HashMap<>();
-    final Map<String, String> connectorAttributes = new HashMap<>();
+    final Map<String, String> sinkAttributes = new HashMap<>();
 
-    extractorAttributes.put("database-name", "test");
-    extractorAttributes.put("capture.table", "true");
-    extractorAttributes.put("table-name", "test");
-    extractorAttributes.put("user", "root");
+    sourceAttributes.put("database-name", "test");
+    sourceAttributes.put("capture.table", "true");
+    sourceAttributes.put("table-name", "test");
+    sourceAttributes.put("user", "root");
 
-    connectorAttributes.put("connector", "iotdb-thrift-connector");
-    connectorAttributes.put("connector.batch.enable", "false");
-    connectorAttributes.put("connector.ip", receiverIp);
-    connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+    sinkAttributes.put("sink", "iotdb-thrift-sink");
+    sinkAttributes.put("sink.batch.enable", "false");
+    sinkAttributes.put("sink.ip", receiverIp);
+    sinkAttributes.put("sink.port", Integer.toString(receiverPort));
 
     final AtomicInteger successCount = new AtomicInteger(0);
     final List<Thread> threads = new ArrayList<>();
@@ -804,8 +802,8 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
                     (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
                   final TSStatus status =
                       client.createPipe(
-                          new TCreatePipeReq("p1", connectorAttributes)
-                              .setExtractorAttributes(extractorAttributes)
+                          new TCreatePipeReq("p1", sinkAttributes)
+                              .setExtractorAttributes(sourceAttributes)
                               .setProcessorAttributes(processorAttributes));
                   if (status.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                     successCount.incrementAndGet();
@@ -889,19 +887,19 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
     final String receiverIp = receiverDataNode.getIp();
     final int receiverPort = receiverDataNode.getPort();
 
-    final Map<String, String> extractorAttributes = new HashMap<>();
+    final Map<String, String> sourceAttributes = new HashMap<>();
     final Map<String, String> processorAttributes = new HashMap<>();
-    final Map<String, String> connectorAttributes = new HashMap<>();
+    final Map<String, String> sinkAttributes = new HashMap<>();
 
-    extractorAttributes.put("database-name", "test");
-    extractorAttributes.put("capture.table", "true");
-    extractorAttributes.put("table-name", "test");
-    extractorAttributes.put("user", "root");
+    sourceAttributes.put("database-name", "test");
+    sourceAttributes.put("capture.table", "true");
+    sourceAttributes.put("table-name", "test");
+    sourceAttributes.put("user", "root");
 
-    connectorAttributes.put("connector", "iotdb-thrift-connector");
-    connectorAttributes.put("connector.batch.enable", "false");
-    connectorAttributes.put("connector.ip", receiverIp);
-    connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+    sinkAttributes.put("sink", "iotdb-thrift-sink");
+    sinkAttributes.put("sink.batch.enable", "false");
+    sinkAttributes.put("sink.ip", receiverIp);
+    sinkAttributes.put("sink.port", Integer.toString(receiverPort));
 
     final AtomicInteger successCount = new AtomicInteger(0);
     final List<Thread> threads = new ArrayList<>();
@@ -914,8 +912,8 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
                     (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
                   final TSStatus status =
                       client.createPipe(
-                          new TCreatePipeReq("p" + finalI, connectorAttributes)
-                              .setExtractorAttributes(extractorAttributes)
+                          new TCreatePipeReq("p" + finalI, sinkAttributes)
+                              .setExtractorAttributes(sourceAttributes)
                               .setProcessorAttributes(processorAttributes));
                   Assert.assertEquals(
                       TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
@@ -970,26 +968,26 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
       TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
       TableModelUtils.insertData("test", "test", -100, 100, senderEnv);
 
-      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> sourceAttributes = new HashMap<>();
       final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
 
-      extractorAttributes.put("extractor", "iotdb-extractor");
-      extractorAttributes.put("database-name", "test");
-      extractorAttributes.put("capture.table", "true");
-      extractorAttributes.put("table-name", "test");
-      extractorAttributes.put("user", "root");
+      sourceAttributes.put("source", "iotdb-source");
+      sourceAttributes.put("database-name", "test");
+      sourceAttributes.put("capture.table", "true");
+      sourceAttributes.put("table-name", "test");
+      sourceAttributes.put("user", "root");
 
       processorAttributes.put("processor", "do-nothing-processor");
 
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.ip", receiverIp);
-      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      sinkAttributes.put("sink", "iotdb-thrift-sink");
+      sinkAttributes.put("sink.ip", receiverIp);
+      sinkAttributes.put("sink.port", Integer.toString(receiverPort));
 
       final TSStatus status =
           client.createPipe(
-              new TCreatePipeReq("p1", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
+              new TCreatePipeReq("p1", sinkAttributes)
+                  .setExtractorAttributes(sourceAttributes)
                   .setProcessorAttributes(processorAttributes));
 
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b19221e1b80..fc738783483 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -106,6 +106,7 @@ public enum TSStatusCode {
   WRITE_PROCESS_REJECT(606),
   OUT_OF_TTL(607),
   COMPACTION_ERROR(608),
+  @Deprecated
   ALIGNED_TIMESERIES_ERROR(609),
   WAL_ERROR(610),
   DISK_SPACE_INSUFFICIENT(611),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4786460bac2..64d0dc6d214 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1150,6 +1150,8 @@ public class IoTDBConfig {
   private String loadDiskSelectStrategyForIoTV2AndPipe =
       LoadDiskSelectorType.INHERIT_LOAD.getValue();
 
+  private boolean skipFailedTableSchemaCheck = false;
+
   /** Pipe related */
   /** initialized as empty, updated based on the latest `systemDir` during 
querying */
   private String[] pipeReceiverFileDirs = new String[0];
@@ -3932,6 +3934,18 @@ public class IoTDBConfig {
     this.loadDiskSelectStrategyForIoTV2AndPipe = 
loadDiskSelectStrategyForIoTV2AndPipe;
   }
 
+  public boolean isSkipFailedTableSchemaCheck() {
+    return skipFailedTableSchemaCheck;
+  }
+
+  public void setSkipFailedTableSchemaCheck(boolean 
skipFailedTableSchemaCheck) {
+    if (this.skipFailedTableSchemaCheck == skipFailedTableSchemaCheck) {
+      return;
+    }
+    this.skipFailedTableSchemaCheck = skipFailedTableSchemaCheck;
+    logger.info("skipFailedTableSchemaCheck is set to {}.", 
skipFailedTableSchemaCheck);
+  }
+
   public long getLoadActiveListeningCheckIntervalSeconds() {
     return loadActiveListeningCheckIntervalSeconds;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 0fa4aaf90e7..c5adbf2512f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2229,7 +2229,7 @@ public class IoTDBDescriptor {
     }
   }
 
-  private void loadLoadTsFileProps(TrimProperties properties) throws 
IOException {
+  private void loadLoadTsFileProps(TrimProperties properties) {
     conf.setMaxAllocateMemoryRatioForLoad(
         Double.parseDouble(
             properties.getProperty(
@@ -2380,6 +2380,12 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "cache_last_values_memory_budget_in_byte",
                 String.valueOf(conf.getCacheLastValuesMemoryBudgetInByte()))));
+
+    conf.setSkipFailedTableSchemaCheck(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "skip_failed_table_schema_check",
+                String.valueOf(conf.isSkipFailedTableSchemaCheck()))));
   }
 
   private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws 
IOException {
@@ -2427,6 +2433,12 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "load_tsfile_split_partition_max_size",
                 Integer.toString(conf.getLoadTsFileSpiltPartitionMaxSize()))));
+
+    conf.setSkipFailedTableSchemaCheck(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "skip_failed_table_schema_check",
+                String.valueOf(conf.isSkipFailedTableSchemaCheck()))));
   }
 
   private void loadPipeHotModifiedProp(TrimProperties properties) throws 
IOException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
deleted file mode 100644
index f11c986afdc..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.exception.metadata;
-
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-public class AlignedTimeseriesException extends MetadataException {
-
-  public AlignedTimeseriesException(String message, String path) {
-    super(
-        String.format("%s (Path: %s)", message, path),
-        TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode(),
-        true);
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index c4044a7213e..c51d88185ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -786,12 +786,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     final long remainingMemory =
         
PipeDataNodeResourceManager.memory().getTotalFloatingMemorySizeInBytes()
             - allocatedMemorySizeInBytes;
-    if (remainingMemory < 
PipeConfig.getInstance().PipeInsertNodeQueueMemory()) {
+    if (remainingMemory < 
PipeConfig.getInstance().getPipeInsertNodeQueueMemory()) {
       final String message =
           String.format(
               "%s Need Floating memory: %d  bytes, free Floating memory: %d 
bytes",
               MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
-              PipeConfig.getInstance().PipeInsertNodeQueueMemory(),
+              PipeConfig.getInstance().getPipeInsertNodeQueueMemory(),
               remainingMemory);
       LOGGER.warn(message);
       throw new PipeException(message);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 47aba940a72..44941e34c3a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -110,8 +110,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                 
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
     this.allocatedMemoryBlockForChunk =
         PipeDataNodeResourceManager.memory()
-            .forceAllocateForTabletWithRetry(
-                
PipeConfig.getInstance().getPipeMaxAlignedSeriesChunkSizeInOneBatch());
+            
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
 
     try {
       tsFileSequenceReader = new 
TsFileSequenceReader(tsFile.getAbsolutePath(), false, false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index 7a5d869aa1a..76cc32ef347 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -77,7 +77,7 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
       this.allocatedMemoryBlockForChunk =
           PipeDataNodeResourceManager.memory()
               .forceAllocateForTabletWithRetry(
-                  
PipeConfig.getInstance().getPipeMaxAlignedSeriesChunkSizeInOneBatch());
+                  PipeConfig.getInstance().getPipeMaxReaderChunkSize());
       this.allocatedMemoryBlockForBatchData =
           
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize);
       this.allocatedMemoryBlockForChunkMeta =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 8d7a567fce0..882485a6c72 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -133,7 +133,6 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
     } else if (context.getCode() == 
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
-        || context.getCode() == 
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()
         || context.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
@@ -168,10 +167,6 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && status.getCode() != 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
             && status.getCode() != 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
-          if (status.getCode() == 
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) {
-            return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-                .setMessage(context.getMessage());
-          }
           return visitStatement(statement, context);
         }
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 6e43d4bf53a..ada788363d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -638,12 +638,6 @@ public class PipeMemoryManager {
     return usedMemorySizeInBytesOfTsFiles;
   }
 
-  public long getAllocatedMemorySizeInBytesOfBatch() {
-    return (long)
-        (PipeConfig.getInstance().getPipeDataStructureBatchMemoryProportion()
-            * getTotalNonFloatingMemorySizeInBytes());
-  }
-
   public long getFreeMemorySizeInBytes() {
     return memoryBlock.getFreeMemoryInBytes();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
index 18b5d4cbc32..690a18dbd46 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PatternTreeMap;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
 import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -150,7 +149,7 @@ public class LoadTsFileTableSchemaCache {
     try {
       createTableAndDatabaseIfNecessary(device.getTableName());
     } catch (final Exception e) {
-      if (PipeConfig.getInstance().isSkipFailedTableSchemaCheck()) {
+      if 
(IoTDBDescriptor.getInstance().getConfig().isSkipFailedTableSchemaCheck()) {
         LOGGER.info(
             "Failed to check table schema, will skip because 
skipFailedTableSchemaCheck is set to true, message: {}",
             e.getMessage());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index c735e8d1a1d..43a637d5c2c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -209,7 +209,6 @@ public class CommonConfig {
   // Sequentially poll the tsFile by default
   private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 1;
   private int pipeRealTimeQueueMaxWaitingTsFileSize = 1;
-  private boolean skipFailedTableSchemaCheck = false;
 
   /** The maximum number of threads that can be used to execute subtasks in 
PipeSubtaskExecutor. */
   private int pipeSubtaskExecutorMaxThreadNum =
@@ -219,7 +218,6 @@ public class CommonConfig {
   private int pipeDataStructureTabletSizeInBytes = 2097152;
   private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 
0.3;
   private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 
0.3;
-  private double PipeDataStructureBatchMemoryProportion = 0.2;
   private volatile double pipeTotalFloatingMemoryProportion = 0.5;
 
   // Check if memory check is enabled for Pipe
@@ -255,7 +253,6 @@ public class CommonConfig {
   private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 50;
 
   private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
-  private long pipeSubtaskExecutorForcedRestartIntervalMs = Long.MAX_VALUE;
 
   private long pipeMaxWaitFinishTime = 10 * 1000;
 
@@ -320,7 +317,7 @@ public class CommonConfig {
   private volatile long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 
3Min
   private volatile long pipeCheckMemoryEnoughIntervalMs = 10L;
   private volatile float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
-  private volatile long pipeMaxAlignedSeriesChunkSizeInOneBatch = (long) 16 * 
1024 * 1024; // 16MB;
+  private volatile long pipeMaxReaderChunkSize = 16 * MB; // 16MB;
   private volatile long pipeListeningQueueTransferSnapshotThreshold = 1000;
   private volatile int pipeSnapshotExecutionMaxBatchSize = 1000;
   private volatile long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
@@ -890,21 +887,6 @@ public class CommonConfig {
         pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold);
   }
 
-  public double getPipeDataStructureBatchMemoryProportion() {
-    return PipeDataStructureBatchMemoryProportion;
-  }
-
-  public void setPipeDataStructureBatchMemoryProportion(
-      double PipeDataStructureBatchMemoryProportion) {
-    if (this.PipeDataStructureBatchMemoryProportion == 
PipeDataStructureBatchMemoryProportion) {
-      return;
-    }
-    this.PipeDataStructureBatchMemoryProportion = 
PipeDataStructureBatchMemoryProportion;
-    logger.info(
-        "PipeDataStructureBatchMemoryProportion is set to {}.",
-        PipeDataStructureBatchMemoryProportion);
-  }
-
   public boolean isPipeEnableMemoryChecked() {
     return isPipeEnableMemoryCheck;
   }
@@ -1452,22 +1434,6 @@ public class CommonConfig {
         pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds);
   }
 
-  public long getPipeSubtaskExecutorForcedRestartIntervalMs() {
-    return pipeSubtaskExecutorForcedRestartIntervalMs;
-  }
-
-  public void setPipeSubtaskExecutorForcedRestartIntervalMs(
-      long pipeSubtaskExecutorForcedRestartIntervalMs) {
-    if (this.pipeSubtaskExecutorForcedRestartIntervalMs
-        == pipeSubtaskExecutorForcedRestartIntervalMs) {
-      return;
-    }
-    this.pipeSubtaskExecutorForcedRestartIntervalMs = 
pipeSubtaskExecutorForcedRestartIntervalMs;
-    logger.info(
-        "pipeSubtaskExecutorForcedRestartIntervalMs is set to {}",
-        pipeSubtaskExecutorForcedRestartIntervalMs);
-  }
-
   public long getPipeMaxWaitFinishTime() {
     return pipeMaxWaitFinishTime;
   }
@@ -1523,18 +1489,6 @@ public class CommonConfig {
         pipeRealTimeQueueMaxWaitingTsFileSize);
   }
 
-  public boolean isSkipFailedTableSchemaCheck() {
-    return skipFailedTableSchemaCheck;
-  }
-
-  public void setSkipFailedTableSchemaCheck(boolean 
skipFailedTableSchemaCheck) {
-    if (this.skipFailedTableSchemaCheck == skipFailedTableSchemaCheck) {
-      return;
-    }
-    this.skipFailedTableSchemaCheck = skipFailedTableSchemaCheck;
-    logger.info("skipFailedTableSchemaCheck is set to {}.", 
skipFailedTableSchemaCheck);
-  }
-
   public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
     if (pipeAirGapReceiverEnabled == this.pipeAirGapReceiverEnabled) {
       return;
@@ -1798,19 +1752,16 @@ public class CommonConfig {
         "pipeLeaderCacheMemoryUsagePercentage is set to {}", 
pipeLeaderCacheMemoryUsagePercentage);
   }
 
-  public long getPipeMaxAlignedSeriesChunkSizeInOneBatch() {
-    return pipeMaxAlignedSeriesChunkSizeInOneBatch;
+  public long getPipeMaxReaderChunkSize() {
+    return pipeMaxReaderChunkSize;
   }
 
-  public void setPipeMaxAlignedSeriesChunkSizeInOneBatch(
-      long pipeMaxAlignedSeriesChunkSizeInOneBatch) {
-    if (this.pipeMaxAlignedSeriesChunkSizeInOneBatch == 
pipeMaxAlignedSeriesChunkSizeInOneBatch) {
+  public void setPipeMaxReaderChunkSize(long pipeMaxReaderChunkSize) {
+    if (this.pipeMaxReaderChunkSize == pipeMaxReaderChunkSize) {
       return;
     }
-    this.pipeMaxAlignedSeriesChunkSizeInOneBatch = 
pipeMaxAlignedSeriesChunkSizeInOneBatch;
-    logger.info(
-        "pipeMaxAlignedSeriesChunkSizeInOneBatch is set to {}",
-        pipeMaxAlignedSeriesChunkSizeInOneBatch);
+    this.pipeMaxReaderChunkSize = pipeMaxReaderChunkSize;
+    logger.info("pipeMaxReaderChunkSize is set to {}", pipeMaxReaderChunkSize);
   }
 
   public long getPipeListeningQueueTransferSnapshotThreshold() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 0957317aa59..55860d2b53c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -398,7 +398,7 @@ public class CommonDescriptor {
         Float.parseFloat(
             properties.getProperty(
                 "subscription_prefetch_missing_rate_threshold",
-                
String.valueOf(config.getSubscriptionPrefetchMemoryThreshold()))));
+                
String.valueOf(config.getSubscriptionPrefetchMissingRateThreshold()))));
     config.setSubscriptionPrefetchEventLocalCountThreshold(
         Integer.parseInt(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 37d9af0f3e6..4e840d51c55 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -69,15 +69,13 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeTotalFloatingMemoryProportion();
   }
 
-  public double getPipeDataStructureBatchMemoryProportion() {
-    return COMMON_CONFIG.getPipeDataStructureBatchMemoryProportion();
-  }
+  /////////////////////////////// Estimation ///////////////////////////////
 
   public boolean isPipeEnableMemoryCheck() {
     return COMMON_CONFIG.isPipeEnableMemoryChecked();
   }
 
-  public long PipeInsertNodeQueueMemory() {
+  public long getPipeInsertNodeQueueMemory() {
     return COMMON_CONFIG.getPipeInsertNodeQueueMemory();
   }
 
@@ -119,10 +117,6 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
   }
 
-  public boolean isSkipFailedTableSchemaCheck() {
-    return COMMON_CONFIG.isSkipFailedTableSchemaCheck();
-  }
-
   /////////////////////////////// Subtask Executor 
///////////////////////////////
 
   public int getPipeSubtaskExecutorMaxThreadNum() {
@@ -145,10 +139,6 @@ public class PipeConfig {
     return 
COMMON_CONFIG.getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
   }
 
-  public long getPipeSubtaskExecutorForcedRestartIntervalMs() {
-    return COMMON_CONFIG.getPipeSubtaskExecutorForcedRestartIntervalMs();
-  }
-
   public long getPipeMaxWaitFinishTime() {
     return COMMON_CONFIG.getPipeMaxWaitFinishTime();
   }
@@ -237,20 +227,8 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeConnectorRequestSliceThresholdBytes();
   }
 
-  public float getPipeLeaderCacheMemoryUsagePercentage() {
-    return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
-  }
-
-  public long getPipeMaxAlignedSeriesChunkSizeInOneBatch() {
-    return COMMON_CONFIG.getPipeMaxAlignedSeriesChunkSizeInOneBatch();
-  }
-
-  public long getPipeListeningQueueTransferSnapshotThreshold() {
-    return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
-  }
-
-  public int getPipeSnapshotExecutionMaxBatchSize() {
-    return COMMON_CONFIG.getPipeSnapshotExecutionMaxBatchSize();
+  public long getPipeMaxReaderChunkSize() {
+    return COMMON_CONFIG.getPipeMaxReaderChunkSize();
   }
 
   public long getPipeRemainingTimeCommitAutoSwitchSeconds() {
@@ -411,6 +389,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeCheckMemoryEnoughIntervalMs();
   }
 
+  public float getPipeLeaderCacheMemoryUsagePercentage() {
+    return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
+  }
+
   /////////////////////////////// TwoStage ///////////////////////////////
 
   public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
@@ -425,6 +407,16 @@ public class PipeConfig {
     return COMMON_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs();
   }
 
+  /////////////////////////////// Meta ///////////////////////////////
+
+  public long getPipeListeningQueueTransferSnapshotThreshold() {
+    return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
+  }
+
+  public int getPipeSnapshotExecutionMaxBatchSize() {
+    return COMMON_CONFIG.getPipeSnapshotExecutionMaxBatchSize();
+  }
+
   /////////////////////////////// Ref ///////////////////////////////
 
   public boolean getPipeEventReferenceTrackingEnabled() {
@@ -460,8 +452,6 @@ public class PipeConfig {
         getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold());
     LOGGER.info("PipeTotalFloatingMemoryProportion: {}", 
getPipeTotalFloatingMemoryProportion());
 
-    LOGGER.info(
-        "PipeDataStructureBatchMemoryProportion: {}", 
getPipeDataStructureBatchMemoryProportion());
     LOGGER.info("IsPipeEnableMemoryCheck: {}", isPipeEnableMemoryCheck());
     LOGGER.info("PipeTsFileParserMemory: {}", getTsFileParserMemory());
     LOGGER.info("SinkBatchMemoryInsertNode: {}", 
getSinkBatchMemoryInsertNode());
@@ -489,9 +479,6 @@ public class PipeConfig {
     LOGGER.info(
         "PipeSubtaskExecutorCronHeartbeatEventIntervalSeconds: {}",
         getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
-    LOGGER.info(
-        "PipeSubtaskExecutorForcedRestartIntervalMs: {}",
-        getPipeSubtaskExecutorForcedRestartIntervalMs());
     LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
 
     LOGGER.info(
@@ -514,9 +501,7 @@ public class PipeConfig {
         isPipeConnectorRPCThriftCompressionEnabled());
     LOGGER.info(
         "PipeLeaderCacheMemoryUsagePercentage: {}", 
getPipeLeaderCacheMemoryUsagePercentage());
-    LOGGER.info(
-        "PipeMaxAlignedSeriesChunkSizeInOneBatch: {}",
-        getPipeMaxAlignedSeriesChunkSizeInOneBatch());
+    LOGGER.info("PipeMaxAlignedSeriesChunkSizeInOneBatch: {}", 
getPipeMaxReaderChunkSize());
     LOGGER.info(
         "PipeListeningQueueTransferSnapshotThreshold: {}",
         getPipeListeningQueueTransferSnapshotThreshold());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 2a170b6b8d1..760a7b5e51f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -210,11 +210,6 @@ public class PipeDescriptor {
                 
"pipe_data_structure_ts_file_memory_block_allocation_reject_threshold",
                 String.valueOf(
                     
config.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold()))));
-    config.setPipeDataStructureBatchMemoryProportion(
-        Double.parseDouble(
-            properties.getProperty(
-                "pipe_data_structure_batch_memory_proportion",
-                
String.valueOf(config.getPipeDataStructureBatchMemoryProportion()))));
     config.setPipeTotalFloatingMemoryProportion(
         Double.parseDouble(
             properties.getProperty(
@@ -278,11 +273,6 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_realTime_queue_max_waiting_tsFile_size",
                 
String.valueOf(config.getPipeRealTimeQueueMaxWaitingTsFileSize()))));
-    config.setSkipFailedTableSchemaCheck(
-        Boolean.parseBoolean(
-            properties.getProperty(
-                "skip_failed_table_schema_check",
-                String.valueOf(config.isSkipFailedTableSchemaCheck()))));
     config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
         Integer.parseInt(
             properties.getProperty(
@@ -300,11 +290,6 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_subtask_executor_pending_queue_max_blocking_time_ms",
                 
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
-    config.setPipeSubtaskExecutorForcedRestartIntervalMs(
-        Long.parseLong(
-            properties.getProperty(
-                "pipe_subtask_executor_forced_restart_interval_ms",
-                
String.valueOf(config.getPipeSubtaskExecutorForcedRestartIntervalMs()))));
 
     config.setPipeExtractorAssignerDisruptorRingBufferSize(
         Integer.parseInt(
@@ -488,11 +473,10 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_leader_cache_memory_usage_percentage",
                 
String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage()))));
-    config.setPipeMaxAlignedSeriesChunkSizeInOneBatch(
+    config.setPipeMaxReaderChunkSize(
         Long.parseLong(
             properties.getProperty(
-                "pipe_max_aligned_series_chunk_size_in_one_batch",
-                
String.valueOf(config.getPipeMaxAlignedSeriesChunkSizeInOneBatch()))));
+                "pipe_max_reader_chunk_size", 
String.valueOf(config.getPipeMaxReaderChunkSize()))));
 
     config.setPipeTransferTsFileSync(
         Boolean.parseBoolean(

Reply via email to