This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-rename
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e1e620dfb5b2dd36971ff292a4ca5b63e8b168d0
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Oct 27 18:28:15 2023 +0800

    rename: extractor -> source
---
 .../api/customizer/parameter/PipeParameters.java   | 135 ++++++++++++++++++---
 .../persistence/pipe/PipePluginInfo.java           |   3 +-
 .../db/pipe/agent/plugin/PipePluginAgent.java      |   4 +-
 .../config/constant/PipeConnectorConstant.java     |  18 +++
 .../config/constant/PipeExtractorConstant.java     |  32 +++--
 .../pipe/extractor/IoTDBDataRegionExtractor.java   |  81 ++++++++-----
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  48 ++++++--
 .../realtime/PipeRealtimeDataRegionExtractor.java  |  10 +-
 .../db/pipe/task/stage/PipeTaskExtractorStage.java |   5 +-
 .../extractor/IoTDBDataRegionExtractorTest.java    |   8 +-
 10 files changed, 270 insertions(+), 74 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index cfea33aa3b9..2d00ce717b8 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.pipe.api.PipeProcessor;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -51,33 +52,73 @@ public class PipeParameters {
     return attributes.containsKey(key);
   }
 
-  public String getString(String key) {
-    return attributes.get(key);
+  public boolean hasAnyAttributes(String... keys) {
+    for (final String key : keys) {
+      if (attributes.containsKey(key)) {
+        return true;
+      }
+    }
+    return false;
   }
 
-  public Boolean getBoolean(String key) {
-    String value = attributes.get(key);
-    return value == null ? null : Boolean.parseBoolean(value);
+  public String getString(String... keys) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return value;
+      }
+    }
+    return null;
   }
 
-  public Integer getInt(String key) {
-    String value = attributes.get(key);
-    return value == null ? null : Integer.parseInt(value);
+  public Boolean getBoolean(String... keys) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Boolean.parseBoolean(value);
+      }
+    }
+    return null;
   }
 
-  public Long getLong(String key) {
-    String value = attributes.get(key);
-    return value == null ? null : Long.parseLong(value);
+  public Integer getInt(String... keys) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Integer.parseInt(value);
+      }
+    }
+    return null;
   }
 
-  public Float getFloat(String key) {
-    String value = attributes.get(key);
-    return value == null ? null : Float.parseFloat(value);
+  public Long getLong(String... keys) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Long.parseLong(value);
+      }
+    }
+    return null;
   }
 
-  public Double getDouble(String key) {
-    String value = attributes.get(key);
-    return value == null ? null : Double.parseDouble(value);
+  public Float getFloat(String... keys) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Float.parseFloat(value);
+      }
+    }
+    return null;
+  }
+
+  public Double getDouble(String... keys) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Double.parseDouble(value);
+      }
+    }
+    return null;
   }
 
   public String getStringOrDefault(String key, String defaultValue) {
@@ -110,6 +151,66 @@ public class PipeParameters {
     return value == null ? defaultValue : Double.parseDouble(value);
   }
 
+  public String getStringOrDefault(List<String> keys, String defaultValue) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return value;
+      }
+    }
+    return defaultValue;
+  }
+
+  public boolean getBooleanOrDefault(List<String> keys, boolean defaultValue) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Boolean.parseBoolean(value);
+      }
+    }
+    return defaultValue;
+  }
+
+  public int getIntOrDefault(List<String> keys, int defaultValue) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Integer.parseInt(value);
+      }
+    }
+    return defaultValue;
+  }
+
+  public long getLongOrDefault(List<String> keys, long defaultValue) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Long.parseLong(value);
+      }
+    }
+    return defaultValue;
+  }
+
+  public float getFloatOrDefault(List<String> keys, float defaultValue) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Float.parseFloat(value);
+      }
+    }
+    return defaultValue;
+  }
+
+  public double getDoubleOrDefault(List<String> keys, double defaultValue) {
+    for (final String key : keys) {
+      final String value = attributes.get(key);
+      if (value != null) {
+        return Double.parseDouble(value);
+      }
+    }
+    return defaultValue;
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index bdc30ddd683..5b1510957c7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -126,7 +126,8 @@ public class PipePluginInfo implements SnapshotProcessor {
         new PipeParameters(createPipeRequest.getExtractorAttributes());
     final String extractorPluginName =
         extractorParameters.getStringOrDefault(
-            PipeExtractorConstant.EXTRACTOR_KEY, 
IOTDB_EXTRACTOR.getPipePluginName());
+            Arrays.asList(PipeExtractorConstant.EXTRACTOR_KEY, 
PipeExtractorConstant.SOURCE_KEY),
+            IOTDB_EXTRACTOR.getPipePluginName());
     if (!pipePluginMetaKeeper.containsPipePlugin(extractorPluginName)) {
       final String exceptionMessage =
           String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 59b18f77014..b1914c2bc61 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.concurrent.locks.ReentrantLock;
 
 public class PipePluginAgent {
@@ -203,7 +204,8 @@ public class PipePluginAgent {
     return (PipeExtractor)
         reflect(
             extractorParameters.getStringOrDefault(
-                PipeExtractorConstant.EXTRACTOR_KEY,
+                Arrays.asList(
+                    PipeExtractorConstant.EXTRACTOR_KEY, 
PipeExtractorConstant.SOURCE_KEY),
                 BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()));
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 8ed5af7ed56..93e28a0a040 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -29,51 +29,69 @@ import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MB;
 public class PipeConnectorConstant {
 
   public static final String CONNECTOR_KEY = "connector";
+  public static final String SINK_KEY = "sink";
 
   public static final String CONNECTOR_IOTDB_IP_KEY = "connector.ip";
+  public static final String SINK_IOTDB_IP_KEY = "sink.ip";
   public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
+  public static final String SINK_IOTDB_PORT_KEY = "sink.port";
   public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = 
"connector.node-urls";
+  public static final String SINK_IOTDB_NODE_URLS_KEY = "sink.node-urls";
 
   public static final String CONNECTOR_IOTDB_PARALLEL_TASKS_KEY = 
"connector.parallel.tasks";
+  public static final String SINK_IOTDB_PARALLEL_TASKS_KEY = 
"sink.parallel.tasks";
   public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE =
       PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum();
 
   public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = 
"connector.batch.enable";
+  public static final String SINK_IOTDB_BATCH_MODE_ENABLE_KEY = 
"sink.batch.enable";
   public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE 
= true;
 
   public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY = 
"connector.batch.max-delay-seconds";
+  public static final String SINK_IOTDB_BATCH_DELAY_KEY = 
"sink.batch.max-delay-seconds";
   public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 1;
 
   public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY = 
"connector.batch.size-bytes";
+  public static final String SINK_IOTDB_BATCH_SIZE_KEY = 
"sink.batch.size-bytes";
   public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16 * MB;
 
   public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
+  public static final String SINK_IOTDB_USER_KEY = "sink.user";
   public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root";
 
   public static final String CONNECTOR_IOTDB_PASSWORD_KEY = 
"connector.password";
+  public static final String SINK_IOTDB_PASSWORD_KEY = "sink.password";
   public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root";
 
   public static final String CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY =
       "connector.air-gap.e-language.enable";
+  public static final String SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY = 
"sink.air-gap.e-language.enable";
   public static final boolean 
CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE = false;
 
   public static final String CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY =
       "connector.air-gap.handshake-timeout-ms";
+  public static final String SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY =
+      "sink.air-gap.handshake-timeout-ms";
   public static final int CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE 
= 5000;
 
   public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY = 
"connector.version";
+  public static final String SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY = 
"sink.version";
   public static final String 
CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE = "1.1";
 
   public static final String CONNECTOR_WEBSOCKET_PORT_KEY = 
"connector.websocket.port";
+  public static final String SINK_WEBSOCKET_PORT_KEY = "sink.websocket.port";
   public static final int CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE = 8080;
 
   public static final String CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY = 
"connector.opcua.tcp.port";
+  public static final String SINK_OPC_UA_TCP_BIND_PORT_KEY = 
"sink.opcua.tcp.port";
   public static final int CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE = 12686;
 
   public static final String CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY = 
"connector.opcua.https.port";
+  public static final String SINK_OPC_UA_HTTPS_BIND_PORT_KEY = 
"sink.opcua.https.port";
   public static final int CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE = 
8443;
 
   public static final String CONNECTOR_OPC_UA_SECURITY_DIR_KEY = 
"connector.opcua.security.dir";
+  public static final String SINK_OPC_UA_SECURITY_DIR_KEY = 
"sink.opcua.security.dir";
   public static final String CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE =
       IoTDBDescriptor.getInstance().getConfDir() != null
           ? IoTDBDescriptor.getInstance().getConfDir() + File.separatorChar + 
"opc_security"
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
index f432e9838cd..a1eead9e6f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
@@ -22,25 +22,37 @@ package org.apache.iotdb.db.pipe.config.constant;
 public class PipeExtractorConstant {
 
   public static final String EXTRACTOR_KEY = "extractor";
+  public static final String SOURCE_KEY = "source";
 
   public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern";
+  public static final String SOURCE_PATTERN_KEY = "source.pattern";
   public static final String EXTRACTOR_PATTERN_DEFAULT_VALUE = "root";
 
   public static final String EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY =
       "extractor.forwarding-pipe-requests";
+  public static final String SOURCE_FORWARDING_PIPE_REQUESTS_KEY =
+      "source.forwarding-pipe-requests";
   public static final boolean EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE 
= true;
 
   public static final String EXTRACTOR_HISTORY_ENABLE_KEY = 
"extractor.history.enable";
-  public static final String EXTRACTOR_HISTORY_START_TIME = 
"extractor.history.start-time";
-  public static final String EXTRACTOR_HISTORY_END_TIME = 
"extractor.history.end-time";
-  public static final String EXTRACTOR_HISTORY_LOOSE_RANGE = 
"extractor.history.loose-range";
-
-  public static final String EXTRACTOR_REALTIME_ENABLE = 
"extractor.realtime.enable";
-  public static final String EXTRACTOR_REALTIME_MODE = 
"extractor.realtime.mode";
-  public static final String EXTRACTOR_REALTIME_MODE_HYBRID = "hybrid";
-  public static final String EXTRACTOR_REALTIME_MODE_FILE = "file";
-  public static final String EXTRACTOR_REALTIME_MODE_LOG = "log";
-  public static final String EXTRACTOR_REALTIME_MODE_FORCED_LOG = "forced-log";
+  public static final String SOURCE_HISTORY_ENABLE_KEY = 
"source.history.enable";
+  public static final boolean EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE = true;
+  public static final String EXTRACTOR_HISTORY_START_TIME_KEY = 
"extractor.history.start-time";
+  public static final String SOURCE_HISTORY_START_TIME_KEY = 
"source.history.start-time";
+  public static final String EXTRACTOR_HISTORY_END_TIME_KEY = 
"extractor.history.end-time";
+  public static final String SOURCE_HISTORY_END_TIME_KEY = 
"source.history.end-time";
+  public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_KEY = 
"extractor.history.loose-range";
+  public static final String SOURCE_HISTORY_LOOSE_RANGE_KEY = 
"source.history.loose-range";
+
+  public static final String EXTRACTOR_REALTIME_ENABLE_KEY = 
"extractor.realtime.enable";
+  public static final String SOURCE_REALTIME_ENABLE_KEY = 
"source.realtime.enable";
+  public static final boolean EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE = true;
+  public static final String EXTRACTOR_REALTIME_MODE_KEY = 
"extractor.realtime.mode";
+  public static final String SOURCE_REALTIME_MODE_KEY = "source.realtime.mode";
+  public static final String EXTRACTOR_REALTIME_MODE_HYBRID_VALUE = "hybrid";
+  public static final String EXTRACTOR_REALTIME_MODE_FILE_VALUE = "file";
+  public static final String EXTRACTOR_REALTIME_MODE_LOG_VALUE = "log";
+  public static final String EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE = 
"forced-log";
 
   private PipeExtractorConstant() {
     throw new IllegalStateException("Utility class");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index c0f120a0d40..f84020760b8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -43,17 +43,23 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
 
 public class IoTDBDataRegionExtractor implements PipeExtractor {
 
@@ -78,24 +84,40 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
         .validateAttributeValueRange(
             EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
         .validateAttributeValueRange(
-            EXTRACTOR_REALTIME_ENABLE, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
+            EXTRACTOR_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
+        .validateAttributeValueRange(
+            SOURCE_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
+        .validateAttributeValueRange(
+            SOURCE_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
         .validate(
             args -> (boolean) args[0] || (boolean) args[1],
-            String.format(
-                "Should not set both %s and %s to false.",
-                EXTRACTOR_HISTORY_ENABLE_KEY, EXTRACTOR_REALTIME_ENABLE),
-            
validator.getParameters().getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, 
true),
-            
validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true));
+            "Should not set both history.enable and realtime.enable to false.",
+            validator
+                .getParameters()
+                .getBooleanOrDefault(
+                    Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
+                    EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE),
+            validator
+                .getParameters()
+                .getBooleanOrDefault(
+                    Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
+                    EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
 
     // Validate extractor.realtime.mode
-    if 
(validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, 
true)) {
+    if (validator
+        .getParameters()
+        .getBooleanOrDefault(
+            Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
+            EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
       validator.validateAttributeValueRange(
-          EXTRACTOR_REALTIME_MODE,
+          validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY)
+              ? EXTRACTOR_REALTIME_MODE_KEY
+              : SOURCE_REALTIME_MODE_KEY,
           true,
-          EXTRACTOR_REALTIME_MODE_FILE,
-          EXTRACTOR_REALTIME_MODE_HYBRID,
-          EXTRACTOR_REALTIME_MODE_LOG,
-          EXTRACTOR_REALTIME_MODE_FORCED_LOG);
+          EXTRACTOR_REALTIME_MODE_FILE_VALUE,
+          EXTRACTOR_REALTIME_MODE_HYBRID_VALUE,
+          EXTRACTOR_REALTIME_MODE_LOG_VALUE,
+          EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE);
     }
 
     constructHistoricalExtractor();
@@ -112,28 +134,31 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
 
   private void constructRealtimeExtractor(PipeParameters parameters) {
     // Enable realtime extractor by default
-    if (!parameters.getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) {
+    if (!parameters.getBooleanOrDefault(
+        Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
+        EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
       realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor();
-      LOGGER.info("'{}' is set to false, use fake realtime extractor.", 
EXTRACTOR_REALTIME_ENABLE);
+      LOGGER.info(
+          "'{}' is set to false, use fake realtime extractor.", 
EXTRACTOR_REALTIME_ENABLE_KEY);
       return;
     }
 
     // Use hybrid mode by default
-    if (!parameters.hasAttribute(EXTRACTOR_REALTIME_MODE)) {
+    if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
       realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
-      LOGGER.info("'{}' is not set, use hybrid mode by default.", 
EXTRACTOR_REALTIME_MODE);
+      LOGGER.info("'{}' is not set, use hybrid mode by default.", 
EXTRACTOR_REALTIME_MODE_KEY);
       return;
     }
 
-    switch (parameters.getString(EXTRACTOR_REALTIME_MODE)) {
-      case EXTRACTOR_REALTIME_MODE_FILE:
+    switch (parameters.getString(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
+      case EXTRACTOR_REALTIME_MODE_FILE_VALUE:
         realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
         break;
-      case EXTRACTOR_REALTIME_MODE_HYBRID:
-      case EXTRACTOR_REALTIME_MODE_LOG:
+      case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE:
+      case EXTRACTOR_REALTIME_MODE_LOG_VALUE:
         realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
         break;
-      case EXTRACTOR_REALTIME_MODE_FORCED_LOG:
+      case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
         realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
         break;
       default:
@@ -141,7 +166,7 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
         if (LOGGER.isWarnEnabled()) {
           LOGGER.warn(
               "Unsupported extractor realtime mode: {}, create a hybrid 
extractor.",
-              parameters.getString(EXTRACTOR_REALTIME_MODE));
+              parameters.getString(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY));
         }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 0b00632ff0f..009a9f510c9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -49,12 +49,18 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
 
 public class PipeHistoricalDataRegionTsFileExtractor implements 
PipeHistoricalDataRegionExtractor {
 
@@ -100,7 +106,10 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.putIfAbsent(dataRegionId, 0L);
     }
 
-    pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY, 
EXTRACTOR_PATTERN_DEFAULT_VALUE);
+    pattern =
+        parameters.getStringOrDefault(
+            Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY),
+            EXTRACTOR_PATTERN_DEFAULT_VALUE);
     final DataRegion dataRegion =
         StorageEngine.getInstance().getDataRegion(new 
DataRegionId(environment.getRegionId()));
     if (dataRegion != null) {
@@ -115,21 +124,32 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     // User may set the EXTRACTOR_HISTORY_START_TIME and 
EXTRACTOR_HISTORY_END_TIME without
     // enabling the historical data extraction, which may affect the realtime 
data extraction.
     final boolean isHistoricalExtractorEnabledByUser =
-        parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true);
+        parameters.getBooleanOrDefault(
+            Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
+            EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
     historicalDataExtractionStartTime =
-        isHistoricalExtractorEnabledByUser && 
parameters.hasAttribute(EXTRACTOR_HISTORY_START_TIME)
+        isHistoricalExtractorEnabledByUser
+                && parameters.hasAnyAttributes(
+                    EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY)
             ? DateTimeUtils.convertDatetimeStrToLong(
-                parameters.getString(EXTRACTOR_HISTORY_START_TIME), 
ZoneId.systemDefault())
+                parameters.getString(
+                    EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY),
+                ZoneId.systemDefault())
             : Long.MIN_VALUE;
     historicalDataExtractionEndTime =
-        isHistoricalExtractorEnabledByUser && 
parameters.hasAttribute(EXTRACTOR_HISTORY_END_TIME)
+        isHistoricalExtractorEnabledByUser
+                && parameters.hasAnyAttributes(
+                    EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY)
             ? DateTimeUtils.convertDatetimeStrToLong(
-                parameters.getString(EXTRACTOR_HISTORY_END_TIME), 
ZoneId.systemDefault())
+                parameters.getString(EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY),
+                ZoneId.systemDefault())
             : Long.MAX_VALUE;
 
     // Enable historical extractor by default
     historicalDataExtractionTimeLowerBound =
-        parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true)
+        parameters.getBooleanOrDefault(
+                Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
+                EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE)
             ? Long.MIN_VALUE
             // We define the realtime data as the data generated after the 
creation time
             // of the pipe from user's perspective. But we still need to use
@@ -168,7 +188,13 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     }
 
     sloppyTimeRange =
-        
Arrays.stream(parameters.getStringOrDefault(EXTRACTOR_HISTORY_LOOSE_RANGE, 
"").split(","))
+        Arrays.stream(
+                parameters
+                    .getStringOrDefault(
+                        Arrays.asList(
+                            EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, 
SOURCE_HISTORY_LOOSE_RANGE_KEY),
+                        "")
+                    .split(","))
             .map(String::trim)
             .map(String::toLowerCase)
             .collect(Collectors.toSet())
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index 2634424ed0d..fb88a91fb93 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -36,9 +36,13 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
+
 public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor 
{
 
   protected String pipeName;
@@ -78,7 +82,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
 
     pattern =
         parameters.getStringOrDefault(
-            PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
+            Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY),
             PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
     final DataRegion dataRegion =
         StorageEngine.getInstance().getDataRegion(new 
DataRegionId(environment.getRegionId()));
@@ -93,7 +97,9 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
 
     isForwardingPipeRequests =
         parameters.getBooleanOrDefault(
-            PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+            Arrays.asList(
+                PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+                PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
             
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
index 75c5955c491..2798d18fa61 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
@@ -33,6 +33,8 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import java.util.Arrays;
+
 public class PipeTaskExtractorStage extends PipeTaskStage {
 
   private final PipeExtractor pipeExtractor;
@@ -46,7 +48,8 @@ public class PipeTaskExtractorStage extends PipeTaskStage {
     pipeExtractor =
         extractorParameters
                 .getStringOrDefault(
-                    PipeExtractorConstant.EXTRACTOR_KEY,
+                    Arrays.asList(
+                        PipeExtractorConstant.EXTRACTOR_KEY, 
PipeExtractorConstant.SOURCE_KEY),
                     BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
                 .equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
             ? new IoTDBDataRegionExtractor()
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
index 3d4bbbbb9b3..1914316fd28 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
@@ -41,10 +41,12 @@ public class IoTDBDataRegionExtractorTest {
                       put(
                           PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
                           Boolean.TRUE.toString());
-                      put(PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE, 
Boolean.TRUE.toString());
                       put(
-                          PipeExtractorConstant.EXTRACTOR_REALTIME_MODE,
-                          
PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID);
+                          PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
+                          Boolean.TRUE.toString());
+                      put(
+                          PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY,
+                          
PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE);
                     }
                   })));
     } catch (Exception e) {


Reply via email to