This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 00fc9b5708b Pipe: rename extractor -> source, connector -> sink, 
sloppy-time-range -> loose-range (#11406)
00fc9b5708b is described below

commit 00fc9b5708bd5cfbbcd971055c53b1141b8c3f32
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sun Oct 29 17:35:23 2023 +0800

    Pipe: rename extractor -> source, connector -> sink, sloppy-time-range -> 
loose-range (#11406)
---
 .../api/customizer/parameter/PipeParameters.java   | 135 ++++++++++++++++++---
 .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 |   2 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |   4 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   8 ++
 .../persistence/pipe/PipePluginInfo.java           |   9 +-
 .../db/pipe/agent/plugin/PipePluginAgent.java      |  11 +-
 .../config/constant/PipeConnectorConstant.java     |  18 +++
 .../config/constant/PipeExtractorConstant.java     |  33 +++--
 .../builder/PipeTransferBatchReqBuilder.java       |   9 +-
 .../db/pipe/connector/protocol/IoTDBConnector.java |  40 +++++-
 .../protocol/airgap/IoTDBAirGapConnector.java      |   8 +-
 .../protocol/legacy/IoTDBLegacyPipeConnector.java  |  37 ++++--
 .../connector/protocol/opcua/OpcUaConnector.java   |  22 +++-
 .../protocol/websocket/WebSocketConnector.java     |   5 +-
 .../pipe/extractor/IoTDBDataRegionExtractor.java   |  81 ++++++++-----
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  62 ++++++++--
 .../realtime/PipeRealtimeDataRegionExtractor.java  |  10 +-
 .../db/pipe/task/stage/PipeTaskExtractorStage.java |   5 +-
 .../connector/PipeConnectorSubtaskManager.java     |   7 +-
 .../plan/analyze/LoadTsfileAnalyzer.java           |   5 +-
 .../extractor/IoTDBDataRegionExtractorTest.java    |   8 +-
 21 files changed, 411 insertions(+), 108 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index cfea33aa3b9..2d00ce717b8 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.pipe.api.PipeProcessor;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -51,33 +52,73 @@ public class PipeParameters {
     return attributes.containsKey(key);
   }
 
-  public String getString(String key) {
-    return attributes.get(key);
+  public boolean hasAnyAttributes(String... keys) {
+    for (final String key : keys) {
+      if (attributes.containsKey(key)) {
+        return true;
+      }
+    }
+    return false;
   }
 
-  public Boolean getBoolean(String key) {
-    String value = attributes.get(key);
-    return value == null ? null : Boolean.parseBoolean(value);
+  public String getString(String... keys) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return value;
+      }
+    }
+    return null;
   }
 
-  public Integer getInt(String key) {
-    String value = attributes.get(key);
-    return value == null ? null : Integer.parseInt(value);
+  public Boolean getBoolean(String... keys) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Boolean.parseBoolean(value);
+      }
+    }
+    return null;
   }
 
-  public Long getLong(String key) {
-    String value = attributes.get(key);
-    return value == null ? null : Long.parseLong(value);
+  public Integer getInt(String... keys) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Integer.parseInt(value);
+      }
+    }
+    return null;
   }
 
-  public Float getFloat(String key) {
-    String value = attributes.get(key);
-    return value == null ? null : Float.parseFloat(value);
+  public Long getLong(String... keys) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Long.parseLong(value);
+      }
+    }
+    return null;
   }
 
-  public Double getDouble(String key) {
-    String value = attributes.get(key);
-    return value == null ? null : Double.parseDouble(value);
+  public Float getFloat(String... keys) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Float.parseFloat(value);
+      }
+    }
+    return null;
+  }
+
+  public Double getDouble(String... keys) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Double.parseDouble(value);
+      }
+    }
+    return null;
   }
 
   public String getStringOrDefault(String key, String defaultValue) {
@@ -110,6 +151,66 @@ public class PipeParameters {
     return value == null ? defaultValue : Double.parseDouble(value);
   }
 
+  public String getStringOrDefault(List<String> keys, String defaultValue) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return value;
+      }
+    }
+    return defaultValue;
+  }
+
+  public boolean getBooleanOrDefault(List<String> keys, boolean defaultValue) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Boolean.parseBoolean(value);
+      }
+    }
+    return defaultValue;
+  }
+
+  public int getIntOrDefault(List<String> keys, int defaultValue) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Integer.parseInt(value);
+      }
+    }
+    return defaultValue;
+  }
+
+  public long getLongOrDefault(List<String> keys, long defaultValue) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Long.parseLong(value);
+      }
+    }
+    return defaultValue;
+  }
+
+  public float getFloatOrDefault(List<String> keys, float defaultValue) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Float.parseFloat(value);
+      }
+    }
+    return defaultValue;
+  }
+
+  public double getDoubleOrDefault(List<String> keys, double defaultValue) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Double.parseDouble(value);
+      }
+    }
+    return defaultValue;
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index af97d6e2047..e92b7b99dfa 100644
--- 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -198,8 +198,10 @@ keyWords
     | SETTLE
     | SGLEVEL
     | SHOW
+    | SINK
     | SLIMIT
     | SOFFSET
+    | SOURCE
     | SPACE
     | STORAGE
     | START
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index dc4235e4013..fefed050b27 100644
--- 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -521,7 +521,7 @@ createPipe
     ;
 
 extractorAttributesClause
-    : WITH EXTRACTOR
+    : WITH (EXTRACTOR | SOURCE)
         LR_BRACKET
         (extractorAttributeClause COMMA)* extractorAttributeClause?
         RR_BRACKET
@@ -543,7 +543,7 @@ processorAttributeClause
     ;
 
 connectorAttributesClause
-    : WITH CONNECTOR
+    : WITH (CONNECTOR | SINK)
         LR_BRACKET
         (connectorAttributeClause COMMA)* connectorAttributeClause?
         RR_BRACKET
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index a1cf3cf6ad2..d98d9b1cbcd 100644
--- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -690,6 +690,10 @@ SHOW
     : S H O W
     ;
 
+SINK
+    : S I N K
+    ;
+
 SLIMIT
     : S L I M I T
     ;
@@ -698,6 +702,10 @@ SOFFSET
     : S O F F S E T
     ;
 
+SOURCE
+    : S O U R C E
+    ;
+
 SPACE
     : S P A C E
     ;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index bdc30ddd683..85ab9f69adc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -126,7 +126,8 @@ public class PipePluginInfo implements SnapshotProcessor {
         new PipeParameters(createPipeRequest.getExtractorAttributes());
     final String extractorPluginName =
         extractorParameters.getStringOrDefault(
-            PipeExtractorConstant.EXTRACTOR_KEY, 
IOTDB_EXTRACTOR.getPipePluginName());
+            Arrays.asList(PipeExtractorConstant.EXTRACTOR_KEY, 
PipeExtractorConstant.SOURCE_KEY),
+            IOTDB_EXTRACTOR.getPipePluginName());
     if (!pipePluginMetaKeeper.containsPipePlugin(extractorPluginName)) {
       final String exceptionMessage =
           String.format(
@@ -152,14 +153,16 @@ public class PipePluginInfo implements SnapshotProcessor {
 
     final PipeParameters connectorParameters =
         new PipeParameters(createPipeRequest.getConnectorAttributes());
-    if 
(!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) {
+    if (!connectorParameters.hasAnyAttributes(
+        PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY)) {
       final String exceptionMessage =
           "Failed to create pipe, the pipe connector plugin is not specified";
       LOGGER.warn(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
     final String connectorPluginName =
-        connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY);
+        connectorParameters.getString(
+            PipeConnectorConstant.CONNECTOR_KEY, 
PipeConnectorConstant.SINK_KEY);
     if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
       final String exceptionMessage =
           String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 59b18f77014..a7edab29de8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.concurrent.locks.ReentrantLock;
 
 public class PipePluginAgent {
@@ -203,7 +204,8 @@ public class PipePluginAgent {
     return (PipeExtractor)
         reflect(
             extractorParameters.getStringOrDefault(
-                PipeExtractorConstant.EXTRACTOR_KEY,
+                Arrays.asList(
+                    PipeExtractorConstant.EXTRACTOR_KEY, 
PipeExtractorConstant.SOURCE_KEY),
                 BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()));
   }
 
@@ -216,13 +218,16 @@ public class PipePluginAgent {
   }
 
   public PipeConnector reflectConnector(PipeParameters connectorParameters) {
-    if 
(!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) {
+    if (!connectorParameters.hasAnyAttributes(
+        PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY)) {
       throw new PipeException(
           "Failed to reflect PipeConnector instance because "
               + "'connector' is not specified in the parameters.");
     }
     return (PipeConnector)
-        
reflect(connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY));
+        reflect(
+            connectorParameters.getString(
+                PipeConnectorConstant.CONNECTOR_KEY, 
PipeConnectorConstant.SINK_KEY));
   }
 
   private PipePlugin reflect(String pluginName) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 8ed5af7ed56..93e28a0a040 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -29,51 +29,69 @@ import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MB;
 public class PipeConnectorConstant {
 
   public static final String CONNECTOR_KEY = "connector";
+  public static final String SINK_KEY = "sink";
 
   public static final String CONNECTOR_IOTDB_IP_KEY = "connector.ip";
+  public static final String SINK_IOTDB_IP_KEY = "sink.ip";
   public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
+  public static final String SINK_IOTDB_PORT_KEY = "sink.port";
   public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = 
"connector.node-urls";
+  public static final String SINK_IOTDB_NODE_URLS_KEY = "sink.node-urls";
 
   public static final String CONNECTOR_IOTDB_PARALLEL_TASKS_KEY = 
"connector.parallel.tasks";
+  public static final String SINK_IOTDB_PARALLEL_TASKS_KEY = 
"sink.parallel.tasks";
   public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE =
       PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum();
 
   public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = 
"connector.batch.enable";
+  public static final String SINK_IOTDB_BATCH_MODE_ENABLE_KEY = 
"sink.batch.enable";
   public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE 
= true;
 
   public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY = 
"connector.batch.max-delay-seconds";
+  public static final String SINK_IOTDB_BATCH_DELAY_KEY = 
"sink.batch.max-delay-seconds";
   public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 1;
 
   public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY = 
"connector.batch.size-bytes";
+  public static final String SINK_IOTDB_BATCH_SIZE_KEY = 
"sink.batch.size-bytes";
   public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16 * MB;
 
   public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
+  public static final String SINK_IOTDB_USER_KEY = "sink.user";
   public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root";
 
   public static final String CONNECTOR_IOTDB_PASSWORD_KEY = 
"connector.password";
+  public static final String SINK_IOTDB_PASSWORD_KEY = "sink.password";
   public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root";
 
   public static final String CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY =
       "connector.air-gap.e-language.enable";
+  public static final String SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY = 
"sink.air-gap.e-language.enable";
   public static final boolean 
CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE = false;
 
   public static final String CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY =
       "connector.air-gap.handshake-timeout-ms";
+  public static final String SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY =
+      "sink.air-gap.handshake-timeout-ms";
   public static final int CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE 
= 5000;
 
   public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY = 
"connector.version";
+  public static final String SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY = 
"sink.version";
   public static final String 
CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE = "1.1";
 
   public static final String CONNECTOR_WEBSOCKET_PORT_KEY = 
"connector.websocket.port";
+  public static final String SINK_WEBSOCKET_PORT_KEY = "sink.websocket.port";
   public static final int CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE = 8080;
 
   public static final String CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY = 
"connector.opcua.tcp.port";
+  public static final String SINK_OPC_UA_TCP_BIND_PORT_KEY = 
"sink.opcua.tcp.port";
   public static final int CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE = 12686;
 
   public static final String CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY = 
"connector.opcua.https.port";
+  public static final String SINK_OPC_UA_HTTPS_BIND_PORT_KEY = 
"sink.opcua.https.port";
   public static final int CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE = 
8443;
 
   public static final String CONNECTOR_OPC_UA_SECURITY_DIR_KEY = 
"connector.opcua.security.dir";
+  public static final String SINK_OPC_UA_SECURITY_DIR_KEY = 
"sink.opcua.security.dir";
   public static final String CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE =
       IoTDBDescriptor.getInstance().getConfDir() != null
           ? IoTDBDescriptor.getInstance().getConfDir() + File.separatorChar + 
"opc_security"
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
index b4be2ed34b9..a1eead9e6f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
@@ -22,26 +22,37 @@ package org.apache.iotdb.db.pipe.config.constant;
 public class PipeExtractorConstant {
 
   public static final String EXTRACTOR_KEY = "extractor";
+  public static final String SOURCE_KEY = "source";
 
   public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern";
+  public static final String SOURCE_PATTERN_KEY = "source.pattern";
   public static final String EXTRACTOR_PATTERN_DEFAULT_VALUE = "root";
 
   public static final String EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY =
       "extractor.forwarding-pipe-requests";
+  public static final String SOURCE_FORWARDING_PIPE_REQUESTS_KEY =
+      "source.forwarding-pipe-requests";
   public static final boolean EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE 
= true;
 
   public static final String EXTRACTOR_HISTORY_ENABLE_KEY = 
"extractor.history.enable";
-  public static final String EXTRACTOR_HISTORY_START_TIME = 
"extractor.history.start-time";
-  public static final String EXTRACTOR_HISTORY_END_TIME = 
"extractor.history.end-time";
-  public static final String EXTRACTOR_HISTORY_SLOPPY_TIME_RANGE =
-      "extractor.history.sloppy-time-range";
-
-  public static final String EXTRACTOR_REALTIME_ENABLE = 
"extractor.realtime.enable";
-  public static final String EXTRACTOR_REALTIME_MODE = 
"extractor.realtime.mode";
-  public static final String EXTRACTOR_REALTIME_MODE_HYBRID = "hybrid";
-  public static final String EXTRACTOR_REALTIME_MODE_FILE = "file";
-  public static final String EXTRACTOR_REALTIME_MODE_LOG = "log";
-  public static final String EXTRACTOR_REALTIME_MODE_FORCED_LOG = "forced-log";
+  public static final String SOURCE_HISTORY_ENABLE_KEY = 
"source.history.enable";
+  public static final boolean EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE = true;
+  public static final String EXTRACTOR_HISTORY_START_TIME_KEY = 
"extractor.history.start-time";
+  public static final String SOURCE_HISTORY_START_TIME_KEY = 
"source.history.start-time";
+  public static final String EXTRACTOR_HISTORY_END_TIME_KEY = 
"extractor.history.end-time";
+  public static final String SOURCE_HISTORY_END_TIME_KEY = 
"source.history.end-time";
+  public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_KEY = 
"extractor.history.loose-range";
+  public static final String SOURCE_HISTORY_LOOSE_RANGE_KEY = 
"source.history.loose-range";
+
+  public static final String EXTRACTOR_REALTIME_ENABLE_KEY = 
"extractor.realtime.enable";
+  public static final String SOURCE_REALTIME_ENABLE_KEY = 
"source.realtime.enable";
+  public static final boolean EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE = true;
+  public static final String EXTRACTOR_REALTIME_MODE_KEY = 
"extractor.realtime.mode";
+  public static final String SOURCE_REALTIME_MODE_KEY = "source.realtime.mode";
+  public static final String EXTRACTOR_REALTIME_MODE_HYBRID_VALUE = "hybrid";
+  public static final String EXTRACTOR_REALTIME_MODE_FILE_VALUE = "file";
+  public static final String EXTRACTOR_REALTIME_MODE_LOG_VALUE = "log";
+  public static final String EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE = 
"forced-log";
 
   private PipeExtractorConstant() {
     throw new IllegalStateException("Utility class");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index 7d3d3b38c23..9521e6a97aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -32,12 +32,15 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
 
 public abstract class PipeTransferBatchReqBuilder {
 
@@ -55,11 +58,13 @@ public abstract class PipeTransferBatchReqBuilder {
   protected PipeTransferBatchReqBuilder(PipeParameters parameters) {
     maxDelayInMs =
         parameters.getIntOrDefault(
-                CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
+                Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
SINK_IOTDB_BATCH_DELAY_KEY),
+                CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
             * 1000;
     maxBatchSizeInBytes =
         parameters.getLongOrDefault(
-            CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
+            Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
+            CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
   }
 
   public List<TPipeTransferReq> getTPipeTransferReqs() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
index 1ca5db5ba15..3a2bae03068 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
@@ -40,6 +40,10 @@ import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
 
 public abstract class IoTDBConnector implements PipeConnector {
 
@@ -53,13 +57,25 @@ public abstract class IoTDBConnector implements 
PipeConnector {
   public void validate(PipeParameterValidator validator) throws Exception {
     final PipeParameters parameters = validator.getParameters();
     validator.validate(
-        args -> (boolean) args[0] || ((boolean) args[1] && (boolean) args[2]),
+        args ->
+            (boolean) args[0]
+                || ((boolean) args[1] && (boolean) args[2])
+                || (boolean) args[3]
+                || ((boolean) args[4] && (boolean) args[5]),
         String.format(
-            "Either %s or %s:%s must be specified",
-            CONNECTOR_IOTDB_NODE_URLS_KEY, CONNECTOR_IOTDB_IP_KEY, 
CONNECTOR_IOTDB_PORT_KEY),
+            "One of %s, %s:%s, %s, %s:%s must be specified",
+            CONNECTOR_IOTDB_NODE_URLS_KEY,
+            CONNECTOR_IOTDB_IP_KEY,
+            CONNECTOR_IOTDB_PORT_KEY,
+            SINK_IOTDB_NODE_URLS_KEY,
+            SINK_IOTDB_IP_KEY,
+            SINK_IOTDB_PORT_KEY),
         parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
         parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
-        parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY));
+        parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
+        parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY),
+        parameters.hasAttribute(SINK_IOTDB_IP_KEY),
+        parameters.hasAttribute(SINK_IOTDB_PORT_KEY));
   }
 
   @Override
@@ -75,19 +91,33 @@ public abstract class IoTDBConnector implements 
PipeConnector {
               parameters.getInt(CONNECTOR_IOTDB_PORT_KEY)));
     }
 
+    if (parameters.hasAttribute(SINK_IOTDB_IP_KEY)
+        && parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) {
+      givenNodeUrls.add(
+          new TEndPoint(
+              parameters.getString(SINK_IOTDB_IP_KEY), 
parameters.getInt(SINK_IOTDB_PORT_KEY)));
+    }
+
     if (parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY)) {
       givenNodeUrls.addAll(
           SessionUtils.parseSeedNodeUrls(
               
Arrays.asList(parameters.getString(CONNECTOR_IOTDB_NODE_URLS_KEY).split(","))));
     }
 
+    if (parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY)) {
+      givenNodeUrls.addAll(
+          SessionUtils.parseSeedNodeUrls(
+              
Arrays.asList(parameters.getString(SINK_IOTDB_NODE_URLS_KEY).split(","))));
+    }
+
     nodeUrls.clear();
     nodeUrls.addAll(givenNodeUrls);
     LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
 
     isTabletBatchModeEnabled =
         parameters.getBooleanOrDefault(
-            CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, 
CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE);
+            Arrays.asList(CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, 
SINK_IOTDB_BATCH_MODE_ENABLE_KEY),
+            CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE);
     LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", 
isTabletBatchModeEnabled);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index add1c166687..8beb6f4d9bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -64,6 +64,8 @@ import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
 
 public class IoTDBAirGapConnector extends IoTDBConnector {
 
@@ -98,14 +100,16 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
 
     handshakeTimeoutMs =
         parameters.getIntOrDefault(
-            CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY,
+            Arrays.asList(
+                CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY, 
SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY),
             CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE);
     LOGGER.info(
         "IoTDBAirGapConnector is customized with handshakeTimeoutMs: {}.", 
handshakeTimeoutMs);
 
     eLanguageEnable =
         parameters.getBooleanOrDefault(
-            CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY,
+            Arrays.asList(
+                CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY, 
SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY),
             CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE);
     LOGGER.info("IoTDBAirGapConnector is customized with eLanguageEnable: 
{}.", eLanguageEnable);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 2fa42762897..f8a05de6f42 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -58,6 +58,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
@@ -67,6 +68,11 @@ import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
 
 public class IoTDBLegacyPipeConnector implements PipeConnector {
 
@@ -91,26 +97,41 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
-    validator
-        .validateRequiredAttribute(CONNECTOR_IOTDB_IP_KEY)
-        .validateRequiredAttribute(CONNECTOR_IOTDB_PORT_KEY);
+    final PipeParameters parameters = validator.getParameters();
+    validator.validate(
+        args ->
+            ((boolean) args[0] && (boolean) args[1]) || ((boolean) args[2] && 
(boolean) args[3]),
+        String.format(
+            "Either %s:%s or %s:%s must be specified",
+            CONNECTOR_IOTDB_IP_KEY,
+            CONNECTOR_IOTDB_PORT_KEY,
+            SINK_IOTDB_IP_KEY,
+            SINK_IOTDB_PORT_KEY),
+        parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
+        parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
+        parameters.hasAttribute(SINK_IOTDB_IP_KEY),
+        parameters.hasAttribute(SINK_IOTDB_PORT_KEY));
   }
 
   @Override
   public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
       throws Exception {
-    ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY);
-    port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY);
+    ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY, 
SINK_IOTDB_IP_KEY);
+    port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY, SINK_IOTDB_PORT_KEY);
 
     user =
-        parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY, 
CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
+        parameters.getStringOrDefault(
+            Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
+            CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
     password =
         parameters.getStringOrDefault(
-            CONNECTOR_IOTDB_PASSWORD_KEY, 
CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+            Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, 
SINK_IOTDB_PASSWORD_KEY),
+            CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
 
     syncConnectorVersion =
         parameters.getStringOrDefault(
-            CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY,
+            Arrays.asList(
+                CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY, 
SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY),
             CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE);
 
     pipeName = configuration.getRuntimeEnvironment().getPipeName();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index cdfcb044392..dad75907cd9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -45,6 +45,7 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -60,6 +61,11 @@ import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
 
 /**
  * Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data 
are converted into
@@ -86,19 +92,25 @@ public class OpcUaConnector implements PipeConnector {
       throws Exception {
     int tcpBindPort =
         parameters.getIntOrDefault(
-            CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, 
CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE);
+            Arrays.asList(CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, 
SINK_OPC_UA_TCP_BIND_PORT_KEY),
+            CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE);
     int httpsBindPort =
         parameters.getIntOrDefault(
-            CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY, 
CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE);
+            Arrays.asList(CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY, 
SINK_OPC_UA_HTTPS_BIND_PORT_KEY),
+            CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE);
 
     String user =
-        parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY, 
CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
+        parameters.getStringOrDefault(
+            Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
+            CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
     String password =
         parameters.getStringOrDefault(
-            CONNECTOR_IOTDB_PASSWORD_KEY, 
CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+            Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, 
SINK_IOTDB_PASSWORD_KEY),
+            CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
     String securityDir =
         parameters.getStringOrDefault(
-            CONNECTOR_OPC_UA_SECURITY_DIR_KEY, 
CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE);
+            Arrays.asList(CONNECTOR_OPC_UA_SECURITY_DIR_KEY, 
SINK_OPC_UA_SECURITY_DIR_KEY),
+            CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE);
 
     synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
       serverKey = httpsBindPort + ":" + tcpBindPort;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 9ee87b1df54..3329fd3bcf1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.Optional;
@@ -68,7 +69,9 @@ public class WebSocketConnector implements PipeConnector {
       throws Exception {
     port =
         parameters.getIntOrDefault(
-            PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_KEY,
+            Arrays.asList(
+                PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_KEY,
+                PipeConnectorConstant.SINK_WEBSOCKET_PORT_KEY),
             PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index ee7adb96601..c68e3047404 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -43,17 +43,23 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
 
 public class IoTDBDataRegionExtractor implements PipeExtractor {
 
@@ -78,24 +84,40 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
         .validateAttributeValueRange(
             EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
         .validateAttributeValueRange(
-            EXTRACTOR_REALTIME_ENABLE, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
+            EXTRACTOR_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
+        .validateAttributeValueRange(
+            SOURCE_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
+        .validateAttributeValueRange(
+            SOURCE_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
         .validate(
             args -> (boolean) args[0] || (boolean) args[1],
-            String.format(
-                "Should not set both %s and %s to false.",
-                EXTRACTOR_HISTORY_ENABLE_KEY, EXTRACTOR_REALTIME_ENABLE),
-            
validator.getParameters().getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, 
true),
-            
validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true));
+            "Should not set both history.enable and realtime.enable to false.",
+            validator
+                .getParameters()
+                .getBooleanOrDefault(
+                    Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
+                    EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE),
+            validator
+                .getParameters()
+                .getBooleanOrDefault(
+                    Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
+                    EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
 
     // Validate extractor.realtime.mode
-    if 
(validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, 
true)) {
+    if (validator
+        .getParameters()
+        .getBooleanOrDefault(
+            Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
+            EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
       validator.validateAttributeValueRange(
-          EXTRACTOR_REALTIME_MODE,
+          validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY)
+              ? EXTRACTOR_REALTIME_MODE_KEY
+              : SOURCE_REALTIME_MODE_KEY,
           true,
-          EXTRACTOR_REALTIME_MODE_FILE,
-          EXTRACTOR_REALTIME_MODE_HYBRID,
-          EXTRACTOR_REALTIME_MODE_LOG,
-          EXTRACTOR_REALTIME_MODE_FORCED_LOG);
+          EXTRACTOR_REALTIME_MODE_FILE_VALUE,
+          EXTRACTOR_REALTIME_MODE_HYBRID_VALUE,
+          EXTRACTOR_REALTIME_MODE_LOG_VALUE,
+          EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE);
     }
 
     constructHistoricalExtractor();
@@ -112,28 +134,31 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
 
   private void constructRealtimeExtractor(PipeParameters parameters) {
     // Enable realtime extractor by default
-    if (!parameters.getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) {
+    if (!parameters.getBooleanOrDefault(
+        Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
+        EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
       realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor();
-      LOGGER.info("'{}' is set to false, use fake realtime extractor.", 
EXTRACTOR_REALTIME_ENABLE);
+      LOGGER.info(
+          "'{}' is set to false, use fake realtime extractor.", 
EXTRACTOR_REALTIME_ENABLE_KEY);
       return;
     }
 
     // Use hybrid mode by default
-    if (!parameters.hasAttribute(EXTRACTOR_REALTIME_MODE)) {
+    if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
       realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
-      LOGGER.info("'{}' is not set, use hybrid mode by default.", 
EXTRACTOR_REALTIME_MODE);
+      LOGGER.info("'{}' is not set, use hybrid mode by default.", 
EXTRACTOR_REALTIME_MODE_KEY);
       return;
     }
 
-    switch (parameters.getString(EXTRACTOR_REALTIME_MODE)) {
-      case EXTRACTOR_REALTIME_MODE_FILE:
+    switch (parameters.getString(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
+      case EXTRACTOR_REALTIME_MODE_FILE_VALUE:
         realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
         break;
-      case EXTRACTOR_REALTIME_MODE_HYBRID:
-      case EXTRACTOR_REALTIME_MODE_LOG:
+      case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE:
+      case EXTRACTOR_REALTIME_MODE_LOG_VALUE:
         realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
         break;
-      case EXTRACTOR_REALTIME_MODE_FORCED_LOG:
+      case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
         realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
         break;
       default:
@@ -141,7 +166,7 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
         if (LOGGER.isWarnEnabled()) {
           LOGGER.warn(
               "Unsupported extractor realtime mode: {}, create a hybrid 
extractor.",
-              parameters.getString(EXTRACTOR_REALTIME_MODE));
+              parameters.getString(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY));
         }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 37da7db751b..009a9f510c9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -42,18 +42,25 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.time.ZoneId;
 import java.util.ArrayDeque;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_SLOPPY_TIME_RANGE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
 
 public class PipeHistoricalDataRegionTsFileExtractor implements 
PipeHistoricalDataRegionExtractor {
 
@@ -99,7 +106,10 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.putIfAbsent(dataRegionId, 0L);
     }
 
-    pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY, 
EXTRACTOR_PATTERN_DEFAULT_VALUE);
+    pattern =
+        parameters.getStringOrDefault(
+            Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY),
+            EXTRACTOR_PATTERN_DEFAULT_VALUE);
     final DataRegion dataRegion =
         StorageEngine.getInstance().getDataRegion(new 
DataRegionId(environment.getRegionId()));
     if (dataRegion != null) {
@@ -114,21 +124,32 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     // User may set the EXTRACTOR_HISTORY_START_TIME and 
EXTRACTOR_HISTORY_END_TIME without
     // enabling the historical data extraction, which may affect the realtime 
data extraction.
     final boolean isHistoricalExtractorEnabledByUser =
-        parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true);
+        parameters.getBooleanOrDefault(
+            Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
+            EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
     historicalDataExtractionStartTime =
-        isHistoricalExtractorEnabledByUser && 
parameters.hasAttribute(EXTRACTOR_HISTORY_START_TIME)
+        isHistoricalExtractorEnabledByUser
+                && parameters.hasAnyAttributes(
+                    EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY)
             ? DateTimeUtils.convertDatetimeStrToLong(
-                parameters.getString(EXTRACTOR_HISTORY_START_TIME), 
ZoneId.systemDefault())
+                parameters.getString(
+                    EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY),
+                ZoneId.systemDefault())
             : Long.MIN_VALUE;
     historicalDataExtractionEndTime =
-        isHistoricalExtractorEnabledByUser && 
parameters.hasAttribute(EXTRACTOR_HISTORY_END_TIME)
+        isHistoricalExtractorEnabledByUser
+                && parameters.hasAnyAttributes(
+                    EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY)
             ? DateTimeUtils.convertDatetimeStrToLong(
-                parameters.getString(EXTRACTOR_HISTORY_END_TIME), 
ZoneId.systemDefault())
+                parameters.getString(EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY),
+                ZoneId.systemDefault())
             : Long.MAX_VALUE;
 
     // Enable historical extractor by default
     historicalDataExtractionTimeLowerBound =
-        parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true)
+        parameters.getBooleanOrDefault(
+                Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
+                EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE)
             ? Long.MIN_VALUE
             // We define the realtime data as the data generated after the 
creation time
             // of the pipe from user's perspective. But we still need to use
@@ -166,7 +187,26 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       }
     }
 
-    sloppyTimeRange = 
parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_SLOPPY_TIME_RANGE, false);
+    sloppyTimeRange =
+        Arrays.stream(
+                parameters
+                    .getStringOrDefault(
+                        Arrays.asList(
+                            EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, 
SOURCE_HISTORY_LOOSE_RANGE_KEY),
+                        "")
+                    .split(","))
+            .map(String::trim)
+            .map(String::toLowerCase)
+            .collect(Collectors.toSet())
+            .contains("time");
+
+    LOGGER.info(
+        "historical data extraction time range, start time {}({}), end time 
{}({}), sloppy time range {}",
+        DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime),
+        historicalDataExtractionStartTime,
+        DateTimeUtils.convertLongToDate(historicalDataExtractionEndTime),
+        historicalDataExtractionEndTime,
+        sloppyTimeRange);
   }
 
   private void flushDataRegionAllTsFiles() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index 3087a6ef41f..fb4e6e521c9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -36,9 +36,13 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
+
 public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor 
{
 
   protected String pipeName;
@@ -80,7 +84,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
 
     pattern =
         parameters.getStringOrDefault(
-            PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
+            Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY),
             PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
     final DataRegion dataRegion =
         StorageEngine.getInstance().getDataRegion(new 
DataRegionId(environment.getRegionId()));
@@ -95,7 +99,9 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
 
     isForwardingPipeRequests =
         parameters.getBooleanOrDefault(
-            PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+            Arrays.asList(
+                PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+                PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
             
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
 
     // Metrics related to TsFileEpoch are managed in PipeExtractorMetrics. 
These metrics are
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
index 75c5955c491..2798d18fa61 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
@@ -33,6 +33,8 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import java.util.Arrays;
+
 public class PipeTaskExtractorStage extends PipeTaskStage {
 
   private final PipeExtractor pipeExtractor;
@@ -46,7 +48,8 @@ public class PipeTaskExtractorStage extends PipeTaskStage {
     pipeExtractor =
         extractorParameters
                 .getStringOrDefault(
-                    PipeExtractorConstant.EXTRACTOR_KEY,
+                    Arrays.asList(
+                        PipeExtractorConstant.EXTRACTOR_KEY, 
PipeExtractorConstant.SOURCE_KEY),
                     BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
                 .equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
             ? new IoTDBDataRegionExtractor()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index fbff28fed45..6450247a53f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -67,14 +68,16 @@ public class PipeConnectorSubtaskManager {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       final int connectorNum =
           pipeConnectorParameters.getIntOrDefault(
-              PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
+              Arrays.asList(
+                  PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
+                  PipeConnectorConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
               
PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
       final List<PipeConnectorSubtaskLifeCycle> 
pipeConnectorSubtaskLifeCycleList =
           new ArrayList<>(connectorNum);
 
       final String connectorKey =
           pipeConnectorParameters.getStringOrDefault(
-              PipeConnectorConstant.CONNECTOR_KEY,
+              Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY, 
PipeConnectorConstant.SINK_KEY),
               BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName());
       // Shared pending queue for all subtasks
       final BoundedBlockingPendingQueue<Event> pendingQueue =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
index 52776115030..cbad758cc55 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
@@ -605,9 +605,10 @@ public class LoadTsfileAnalyzer {
           }
 
           // check encoding
-          if 
(!tsFileSchema.getEncodingType().equals(iotdbSchema.getEncodingType())) {
+          if (LOGGER.isDebugEnabled()
+              && 
!tsFileSchema.getEncodingType().equals(iotdbSchema.getEncodingType())) {
             // we allow a measurement to have different encodings in different 
chunks
-            LOGGER.warn(
+            LOGGER.debug(
                 "Encoding type not match, measurement: {}{}{}, "
                     + "TsFile encoding: {}, IoTDB encoding: {}",
                 device,
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
index 3d4bbbbb9b3..1914316fd28 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
@@ -41,10 +41,12 @@ public class IoTDBDataRegionExtractorTest {
                       put(
                           PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
                           Boolean.TRUE.toString());
-                      put(PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE, 
Boolean.TRUE.toString());
                       put(
-                          PipeExtractorConstant.EXTRACTOR_REALTIME_MODE,
-                          
PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID);
+                          PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
+                          Boolean.TRUE.toString());
+                      put(
+                          PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY,
+                          
PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE);
                     }
                   })));
     } catch (Exception e) {

Reply via email to