This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-rename
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a1752caf27565c46c9653a13fe527fd5756177ce
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Oct 27 19:35:36 2023 +0800

    rename: connector -> sink
---
 .../persistence/pipe/PipePluginInfo.java           |  6 ++--
 .../db/pipe/agent/plugin/PipePluginAgent.java      |  7 ++--
 .../builder/PipeTransferBatchReqBuilder.java       |  9 +++--
 .../db/pipe/connector/protocol/IoTDBConnector.java | 40 +++++++++++++++++++---
 .../protocol/airgap/IoTDBAirGapConnector.java      |  8 +++--
 .../protocol/legacy/IoTDBLegacyPipeConnector.java  | 37 +++++++++++++++-----
 .../connector/protocol/opcua/OpcUaConnector.java   | 22 +++++++++---
 .../protocol/websocket/WebSocketConnector.java     |  5 ++-
 .../connector/PipeConnectorSubtaskManager.java     |  7 ++--
 9 files changed, 112 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 5b1510957c7..85ab9f69adc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -153,14 +153,16 @@ public class PipePluginInfo implements SnapshotProcessor {
 
     final PipeParameters connectorParameters =
         new PipeParameters(createPipeRequest.getConnectorAttributes());
-    if 
(!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) {
+    if (!connectorParameters.hasAnyAttributes(
+        PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY)) {
       final String exceptionMessage =
           "Failed to create pipe, the pipe connector plugin is not specified";
       LOGGER.warn(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
     final String connectorPluginName =
-        connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY);
+        connectorParameters.getString(
+            PipeConnectorConstant.CONNECTOR_KEY, 
PipeConnectorConstant.SINK_KEY);
     if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
       final String exceptionMessage =
           String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index b1914c2bc61..a7edab29de8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -218,13 +218,16 @@ public class PipePluginAgent {
   }
 
   public PipeConnector reflectConnector(PipeParameters connectorParameters) {
-    if 
(!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) {
+    if (!connectorParameters.hasAnyAttributes(
+        PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY)) {
       throw new PipeException(
           "Failed to reflect PipeConnector instance because "
               + "'connector' is not specified in the parameters.");
     }
     return (PipeConnector)
-        
reflect(connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY));
+        reflect(
+            connectorParameters.getString(
+                PipeConnectorConstant.CONNECTOR_KEY, 
PipeConnectorConstant.SINK_KEY));
   }
 
   private PipePlugin reflect(String pluginName) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index 7d3d3b38c23..9521e6a97aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -32,12 +32,15 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
 
 public abstract class PipeTransferBatchReqBuilder {
 
@@ -55,11 +58,13 @@ public abstract class PipeTransferBatchReqBuilder {
   protected PipeTransferBatchReqBuilder(PipeParameters parameters) {
     maxDelayInMs =
         parameters.getIntOrDefault(
-                CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
+                Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
SINK_IOTDB_BATCH_DELAY_KEY),
+                CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
             * 1000;
     maxBatchSizeInBytes =
         parameters.getLongOrDefault(
-            CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
+            Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
+            CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
   }
 
   public List<TPipeTransferReq> getTPipeTransferReqs() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
index 1ca5db5ba15..3a2bae03068 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
@@ -40,6 +40,10 @@ import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
 
 public abstract class IoTDBConnector implements PipeConnector {
 
@@ -53,13 +57,25 @@ public abstract class IoTDBConnector implements 
PipeConnector {
   public void validate(PipeParameterValidator validator) throws Exception {
     final PipeParameters parameters = validator.getParameters();
     validator.validate(
-        args -> (boolean) args[0] || ((boolean) args[1] && (boolean) args[2]),
+        args ->
+            (boolean) args[0]
+                || ((boolean) args[1] && (boolean) args[2])
+                || (boolean) args[3]
+                || ((boolean) args[4] && (boolean) args[5]),
         String.format(
-            "Either %s or %s:%s must be specified",
-            CONNECTOR_IOTDB_NODE_URLS_KEY, CONNECTOR_IOTDB_IP_KEY, 
CONNECTOR_IOTDB_PORT_KEY),
+            "One of %s, %s:%s, %s, %s:%s must be specified",
+            CONNECTOR_IOTDB_NODE_URLS_KEY,
+            CONNECTOR_IOTDB_IP_KEY,
+            CONNECTOR_IOTDB_PORT_KEY,
+            SINK_IOTDB_NODE_URLS_KEY,
+            SINK_IOTDB_IP_KEY,
+            SINK_IOTDB_PORT_KEY),
         parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
         parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
-        parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY));
+        parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
+        parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY),
+        parameters.hasAttribute(SINK_IOTDB_IP_KEY),
+        parameters.hasAttribute(SINK_IOTDB_PORT_KEY));
   }
 
   @Override
@@ -75,19 +91,33 @@ public abstract class IoTDBConnector implements 
PipeConnector {
               parameters.getInt(CONNECTOR_IOTDB_PORT_KEY)));
     }
 
+    if (parameters.hasAttribute(SINK_IOTDB_IP_KEY)
+        && parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) {
+      givenNodeUrls.add(
+          new TEndPoint(
+              parameters.getString(SINK_IOTDB_IP_KEY), 
parameters.getInt(SINK_IOTDB_PORT_KEY)));
+    }
+
     if (parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY)) {
       givenNodeUrls.addAll(
           SessionUtils.parseSeedNodeUrls(
               
Arrays.asList(parameters.getString(CONNECTOR_IOTDB_NODE_URLS_KEY).split(","))));
     }
 
+    if (parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY)) {
+      givenNodeUrls.addAll(
+          SessionUtils.parseSeedNodeUrls(
+              
Arrays.asList(parameters.getString(SINK_IOTDB_NODE_URLS_KEY).split(","))));
+    }
+
     nodeUrls.clear();
     nodeUrls.addAll(givenNodeUrls);
     LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
 
     isTabletBatchModeEnabled =
         parameters.getBooleanOrDefault(
-            CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, 
CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE);
+            Arrays.asList(CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, 
SINK_IOTDB_BATCH_MODE_ENABLE_KEY),
+            CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE);
     LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", 
isTabletBatchModeEnabled);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index add1c166687..8beb6f4d9bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -64,6 +64,8 @@ import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
 
 public class IoTDBAirGapConnector extends IoTDBConnector {
 
@@ -98,14 +100,16 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
 
     handshakeTimeoutMs =
         parameters.getIntOrDefault(
-            CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY,
+            Arrays.asList(
+                CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY, 
SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY),
             CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE);
     LOGGER.info(
         "IoTDBAirGapConnector is customized with handshakeTimeoutMs: {}.", 
handshakeTimeoutMs);
 
     eLanguageEnable =
         parameters.getBooleanOrDefault(
-            CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY,
+            Arrays.asList(
+                CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY, 
SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY),
             CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE);
     LOGGER.info("IoTDBAirGapConnector is customized with eLanguageEnable: 
{}.", eLanguageEnable);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 2fa42762897..f8a05de6f42 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -58,6 +58,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
@@ -67,6 +68,11 @@ import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
 
 public class IoTDBLegacyPipeConnector implements PipeConnector {
 
@@ -91,26 +97,41 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
-    validator
-        .validateRequiredAttribute(CONNECTOR_IOTDB_IP_KEY)
-        .validateRequiredAttribute(CONNECTOR_IOTDB_PORT_KEY);
+    final PipeParameters parameters = validator.getParameters();
+    validator.validate(
+        args ->
+            ((boolean) args[0] && (boolean) args[1]) || ((boolean) args[2] && 
(boolean) args[3]),
+        String.format(
+            "Either %s:%s or %s:%s must be specified",
+            CONNECTOR_IOTDB_IP_KEY,
+            CONNECTOR_IOTDB_PORT_KEY,
+            SINK_IOTDB_IP_KEY,
+            SINK_IOTDB_PORT_KEY),
+        parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
+        parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
+        parameters.hasAttribute(SINK_IOTDB_IP_KEY),
+        parameters.hasAttribute(SINK_IOTDB_PORT_KEY));
   }
 
   @Override
   public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
       throws Exception {
-    ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY);
-    port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY);
+    ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY, 
SINK_IOTDB_IP_KEY);
+    port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY, SINK_IOTDB_PORT_KEY);
 
     user =
-        parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY, 
CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
+        parameters.getStringOrDefault(
+            Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
+            CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
     password =
         parameters.getStringOrDefault(
-            CONNECTOR_IOTDB_PASSWORD_KEY, 
CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+            Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, 
SINK_IOTDB_PASSWORD_KEY),
+            CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
 
     syncConnectorVersion =
         parameters.getStringOrDefault(
-            CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY,
+            Arrays.asList(
+                CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY, 
SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY),
             CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE);
 
     pipeName = configuration.getRuntimeEnvironment().getPipeName();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index cdfcb044392..dad75907cd9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -45,6 +45,7 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -60,6 +61,11 @@ import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
 
 /**
  * Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data 
are converted into
@@ -86,19 +92,25 @@ public class OpcUaConnector implements PipeConnector {
       throws Exception {
     int tcpBindPort =
         parameters.getIntOrDefault(
-            CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, 
CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE);
+            Arrays.asList(CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, 
SINK_OPC_UA_TCP_BIND_PORT_KEY),
+            CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE);
     int httpsBindPort =
         parameters.getIntOrDefault(
-            CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY, 
CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE);
+            Arrays.asList(CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY, 
SINK_OPC_UA_HTTPS_BIND_PORT_KEY),
+            CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE);
 
     String user =
-        parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY, 
CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
+        parameters.getStringOrDefault(
+            Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
+            CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
     String password =
         parameters.getStringOrDefault(
-            CONNECTOR_IOTDB_PASSWORD_KEY, 
CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+            Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, 
SINK_IOTDB_PASSWORD_KEY),
+            CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
     String securityDir =
         parameters.getStringOrDefault(
-            CONNECTOR_OPC_UA_SECURITY_DIR_KEY, 
CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE);
+            Arrays.asList(CONNECTOR_OPC_UA_SECURITY_DIR_KEY, 
SINK_OPC_UA_SECURITY_DIR_KEY),
+            CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE);
 
     synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
       serverKey = httpsBindPort + ":" + tcpBindPort;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 9ee87b1df54..3329fd3bcf1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.Optional;
@@ -68,7 +69,9 @@ public class WebSocketConnector implements PipeConnector {
       throws Exception {
     port =
         parameters.getIntOrDefault(
-            PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_KEY,
+            Arrays.asList(
+                PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_KEY,
+                PipeConnectorConstant.SINK_WEBSOCKET_PORT_KEY),
             PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index fbff28fed45..6450247a53f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -67,14 +68,16 @@ public class PipeConnectorSubtaskManager {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       final int connectorNum =
           pipeConnectorParameters.getIntOrDefault(
-              PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
+              Arrays.asList(
+                  PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
+                  PipeConnectorConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
               
PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
       final List<PipeConnectorSubtaskLifeCycle> 
pipeConnectorSubtaskLifeCycleList =
           new ArrayList<>(connectorNum);
 
       final String connectorKey =
           pipeConnectorParameters.getStringOrDefault(
-              PipeConnectorConstant.CONNECTOR_KEY,
+              Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY, 
PipeConnectorConstant.SINK_KEY),
               BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName());
       // Shared pending queue for all subtasks
       final BoundedBlockingPendingQueue<Event> pendingQueue =

Reply via email to