This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-rename in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a1752caf27565c46c9653a13fe527fd5756177ce Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Oct 27 19:35:36 2023 +0800 rename: connector -> sink --- .../persistence/pipe/PipePluginInfo.java | 6 ++-- .../db/pipe/agent/plugin/PipePluginAgent.java | 7 ++-- .../builder/PipeTransferBatchReqBuilder.java | 9 +++-- .../db/pipe/connector/protocol/IoTDBConnector.java | 40 +++++++++++++++++++--- .../protocol/airgap/IoTDBAirGapConnector.java | 8 +++-- .../protocol/legacy/IoTDBLegacyPipeConnector.java | 37 +++++++++++++++----- .../connector/protocol/opcua/OpcUaConnector.java | 22 +++++++++--- .../protocol/websocket/WebSocketConnector.java | 5 ++- .../connector/PipeConnectorSubtaskManager.java | 7 ++-- 9 files changed, 112 insertions(+), 29 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 5b1510957c7..85ab9f69adc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -153,14 +153,16 @@ public class PipePluginInfo implements SnapshotProcessor { final PipeParameters connectorParameters = new PipeParameters(createPipeRequest.getConnectorAttributes()); - if (!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) { + if (!connectorParameters.hasAnyAttributes( + PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY)) { final String exceptionMessage = "Failed to create pipe, the pipe connector plugin is not specified"; LOGGER.warn(exceptionMessage); throw new PipeException(exceptionMessage); } final String connectorPluginName = - connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY); + connectorParameters.getString( + PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY); if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) { final String exceptionMessage = String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java index b1914c2bc61..a7edab29de8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java @@ -218,13 +218,16 @@ public class PipePluginAgent { } public PipeConnector reflectConnector(PipeParameters connectorParameters) { - if (!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) { + if (!connectorParameters.hasAnyAttributes( + PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY)) { throw new PipeException( "Failed to reflect PipeConnector instance because " + "'connector' is not specified in the parameters."); } return (PipeConnector) - reflect(connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY)); + reflect( + connectorParameters.getString( + PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY)); } private PipePlugin reflect(String pluginName) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java index 7d3d3b38c23..9521e6a97aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java @@ -32,12 +32,15 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY; public abstract class PipeTransferBatchReqBuilder { @@ -55,11 +58,13 @@ public abstract class PipeTransferBatchReqBuilder { protected PipeTransferBatchReqBuilder(PipeParameters parameters) { maxDelayInMs = parameters.getIntOrDefault( - CONNECTOR_IOTDB_BATCH_DELAY_KEY, CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE) + Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, SINK_IOTDB_BATCH_DELAY_KEY), + CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE) * 1000; maxBatchSizeInBytes = parameters.getLongOrDefault( - CONNECTOR_IOTDB_BATCH_SIZE_KEY, CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE); + Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY), + CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE); } public List<TPipeTransferReq> getTPipeTransferReqs() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java index 1ca5db5ba15..3a2bae03068 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java @@ -40,6 +40,10 @@ import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY; public abstract class IoTDBConnector implements PipeConnector { @@ -53,13 +57,25 @@ public abstract class IoTDBConnector implements PipeConnector { public void validate(PipeParameterValidator validator) throws Exception { final PipeParameters parameters = validator.getParameters(); validator.validate( - args -> (boolean) args[0] || ((boolean) args[1] && (boolean) args[2]), + args -> + (boolean) args[0] + || ((boolean) args[1] && (boolean) args[2]) + || (boolean) args[3] + || ((boolean) args[4] && (boolean) args[5]), String.format( - "Either %s or %s:%s must be specified", - CONNECTOR_IOTDB_NODE_URLS_KEY, CONNECTOR_IOTDB_IP_KEY, CONNECTOR_IOTDB_PORT_KEY), + "One of %s, %s:%s, %s, %s:%s must be specified", + CONNECTOR_IOTDB_NODE_URLS_KEY, + CONNECTOR_IOTDB_IP_KEY, + CONNECTOR_IOTDB_PORT_KEY, + SINK_IOTDB_NODE_URLS_KEY, + SINK_IOTDB_IP_KEY, + SINK_IOTDB_PORT_KEY), parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY), parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY), - parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY)); + parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY), + parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY), + parameters.hasAttribute(SINK_IOTDB_IP_KEY), + parameters.hasAttribute(SINK_IOTDB_PORT_KEY)); } @Override @@ -75,19 +91,33 @@ public abstract class IoTDBConnector implements PipeConnector { parameters.getInt(CONNECTOR_IOTDB_PORT_KEY))); } + if (parameters.hasAttribute(SINK_IOTDB_IP_KEY) + && parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) { + givenNodeUrls.add( + new TEndPoint( + parameters.getString(SINK_IOTDB_IP_KEY), parameters.getInt(SINK_IOTDB_PORT_KEY))); + } + if (parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY)) { givenNodeUrls.addAll( SessionUtils.parseSeedNodeUrls( Arrays.asList(parameters.getString(CONNECTOR_IOTDB_NODE_URLS_KEY).split(",")))); } + if (parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY)) { + givenNodeUrls.addAll( + SessionUtils.parseSeedNodeUrls( + Arrays.asList(parameters.getString(SINK_IOTDB_NODE_URLS_KEY).split(",")))); + } + nodeUrls.clear(); nodeUrls.addAll(givenNodeUrls); LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls); isTabletBatchModeEnabled = parameters.getBooleanOrDefault( - CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE); + Arrays.asList(CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, SINK_IOTDB_BATCH_MODE_ENABLE_KEY), + CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE); LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", isTabletBatchModeEnabled); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java index add1c166687..8beb6f4d9bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java @@ -64,6 +64,8 @@ import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY; public class IoTDBAirGapConnector extends IoTDBConnector { @@ -98,14 +100,16 @@ public class IoTDBAirGapConnector extends IoTDBConnector { handshakeTimeoutMs = parameters.getIntOrDefault( - CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY, + Arrays.asList( + CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY, SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY), CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE); LOGGER.info( "IoTDBAirGapConnector is customized with handshakeTimeoutMs: {}.", handshakeTimeoutMs); eLanguageEnable = parameters.getBooleanOrDefault( - CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY, + Arrays.asList( + CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY, SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY), CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE); LOGGER.info("IoTDBAirGapConnector is customized with eLanguageEnable: {}.", eLanguageEnable); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java index 2fa42762897..f8a05de6f42 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java @@ -58,6 +58,7 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.util.Arrays; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; @@ -67,6 +68,11 @@ import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY; public class IoTDBLegacyPipeConnector implements PipeConnector { @@ -91,26 +97,41 @@ public class IoTDBLegacyPipeConnector implements PipeConnector { @Override public void validate(PipeParameterValidator validator) throws Exception { - validator - .validateRequiredAttribute(CONNECTOR_IOTDB_IP_KEY) - .validateRequiredAttribute(CONNECTOR_IOTDB_PORT_KEY); + final PipeParameters parameters = validator.getParameters(); + validator.validate( + args -> + ((boolean) args[0] && (boolean) args[1]) || ((boolean) args[2] && (boolean) args[3]), + String.format( + "Either %s:%s or %s:%s must be specified", + CONNECTOR_IOTDB_IP_KEY, + CONNECTOR_IOTDB_PORT_KEY, + SINK_IOTDB_IP_KEY, + SINK_IOTDB_PORT_KEY), + parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY), + parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY), + parameters.hasAttribute(SINK_IOTDB_IP_KEY), + parameters.hasAttribute(SINK_IOTDB_PORT_KEY)); } @Override public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) throws Exception { - ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY); - port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY); + ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY, SINK_IOTDB_IP_KEY); + port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY, SINK_IOTDB_PORT_KEY); user = - parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY, CONNECTOR_IOTDB_USER_DEFAULT_VALUE); + parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY), + CONNECTOR_IOTDB_USER_DEFAULT_VALUE); password = parameters.getStringOrDefault( - CONNECTOR_IOTDB_PASSWORD_KEY, CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE); + Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, SINK_IOTDB_PASSWORD_KEY), + CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE); syncConnectorVersion = parameters.getStringOrDefault( - CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY, + Arrays.asList( + CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY, SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY), CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE); pipeName = configuration.getRuntimeEnvironment().getPipeName(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java index cdfcb044392..dad75907cd9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java @@ -45,6 +45,7 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -60,6 +61,11 @@ import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_SECURITY_DIR_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY; /** * Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data are converted into @@ -86,19 +92,25 @@ public class OpcUaConnector implements PipeConnector { throws Exception { int tcpBindPort = parameters.getIntOrDefault( - CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE); + Arrays.asList(CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, SINK_OPC_UA_TCP_BIND_PORT_KEY), + CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE); int httpsBindPort = parameters.getIntOrDefault( - CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY, CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE); + Arrays.asList(CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY, SINK_OPC_UA_HTTPS_BIND_PORT_KEY), + CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE); String user = - parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY, CONNECTOR_IOTDB_USER_DEFAULT_VALUE); + parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY), + CONNECTOR_IOTDB_USER_DEFAULT_VALUE); String password = parameters.getStringOrDefault( - CONNECTOR_IOTDB_PASSWORD_KEY, CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE); + Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, SINK_IOTDB_PASSWORD_KEY), + CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE); String securityDir = parameters.getStringOrDefault( - CONNECTOR_OPC_UA_SECURITY_DIR_KEY, CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE); + Arrays.asList(CONNECTOR_OPC_UA_SECURITY_DIR_KEY, SINK_OPC_UA_SECURITY_DIR_KEY), + CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE); synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) { serverKey = httpsBindPort + ":" + tcpBindPort; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java index 9ee87b1df54..3329fd3bcf1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.net.InetSocketAddress; +import java.util.Arrays; import java.util.Comparator; import java.util.Map; import java.util.Optional; @@ -68,7 +69,9 @@ public class WebSocketConnector implements PipeConnector { throws Exception { port = parameters.getIntOrDefault( - PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_KEY, + Arrays.asList( + PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_KEY, + PipeConnectorConstant.SINK_WEBSOCKET_PORT_KEY), PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java index fbff28fed45..6450247a53f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java @@ -40,6 +40,7 @@ import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -67,14 +68,16 @@ public class PipeConnectorSubtaskManager { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { final int connectorNum = pipeConnectorParameters.getIntOrDefault( - PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, + Arrays.asList( + PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, + PipeConnectorConstant.SINK_IOTDB_PARALLEL_TASKS_KEY), PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE); final List<PipeConnectorSubtaskLifeCycle> pipeConnectorSubtaskLifeCycleList = new ArrayList<>(connectorNum); final String connectorKey = pipeConnectorParameters.getStringOrDefault( - PipeConnectorConstant.CONNECTOR_KEY, + Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY), BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()); // Shared pending queue for all subtasks final BoundedBlockingPendingQueue<Event> pendingQueue =
