This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-ssl-refactor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b72075ec006f0ef5fe5cc7f291128f6d4ce1f54b Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Dec 13 16:32:46 2023 +0800 Pipe: iotdb-ssl-sink --- .../agent/plugin/PipeConnectorConstructor.java | 4 +++ .../thrift/sync/IoTDBThriftSyncConnector.java | 32 +++++++++++++++------- .../pipe/plugin/builtin/BuiltinPipePlugin.java | 8 +++++- .../builtin/connector/IoTDBSslConnector.java | 28 +++++++++++++++++++ 4 files changed, 61 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java index 0137f1c3c7e..6ab65c67282 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java @@ -56,6 +56,8 @@ public class PipeConnectorConstructor extends PipePluginConstructor { PLUGIN_CONSTRUCTORS.put( BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(), IoTDBLegacyPipeConnector::new); + PLUGIN_CONSTRUCTORS.put( + BuiltinPipePlugin.IOTDB_SSL_CONNECTOR.getPipePluginName(), IoTDBThriftSyncConnector::new); PLUGIN_CONSTRUCTORS.put( BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), IoTDBAirGapConnector::new); PLUGIN_CONSTRUCTORS.put( @@ -78,6 +80,8 @@ public class PipeConnectorConstructor extends PipePluginConstructor { PLUGIN_CONSTRUCTORS.put( BuiltinPipePlugin.IOTDB_LEGACY_PIPE_SINK.getPipePluginName(), IoTDBLegacyPipeConnector::new); + PLUGIN_CONSTRUCTORS.put( + BuiltinPipePlugin.IOTDB_SSL_SINK.getPipePluginName(), IoTDBThriftSyncConnector::new); PLUGIN_CONSTRUCTORS.put( BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBAirGapConnector::new); PLUGIN_CONSTRUCTORS.put( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java index 2f50a7081b0..c85d2cfa6f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java @@ -67,9 +67,13 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_SSL_CONNECTOR; +import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_SSL_SINK; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_KEY; public class IoTDBThriftSyncConnector extends IoTDBConnector { @@ -81,7 +85,7 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector { private final List<Boolean> isClientAlive = new ArrayList<>(); private boolean useSSL; - private String trustStore; + private String trustStorePath; private String trustStorePwd; private long currentClientIndex = 0; @@ -98,7 +102,10 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector { final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig(); final PipeParameters parameters = validator.getParameters(); - Set<TEndPoint> givenNodeUrls = parseNodeUrls(parameters); + + final String userSpecifiedConnectorName = + parameters.getStringByKeys(CONNECTOR_KEY, SINK_KEY).toLowerCase(); + final Set<TEndPoint> givenNodeUrls = parseNodeUrls(parameters); validator .validate( @@ -122,11 +129,11 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector { .validate( args -> !((boolean) args[0]) || ((boolean) args[1] && (boolean) args[2]), String.format( - "When %s is specified to true, %s and %s must be specified", - SINK_IOTDB_SSL_ENABLE_KEY, - SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY, - SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY), - parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false), + "When ssl transport is enabled, %s and %s must be specified", + SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY, SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY), + IOTDB_SSL_CONNECTOR.getPipePluginName().equals(userSpecifiedConnectorName) + || IOTDB_SSL_SINK.getClassName().equals(userSpecifiedConnectorName) + || parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false), parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY), parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY)); } @@ -145,8 +152,13 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector { tabletBatchBuilder = new IoTDBThriftSyncPipeTransferBatchReqBuilder(parameters); } - useSSL = parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false); - trustStore = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY); + final String userSpecifiedConnectorName = + parameters.getStringByKeys(CONNECTOR_KEY, SINK_KEY).toLowerCase(); + useSSL = + IOTDB_SSL_CONNECTOR.getPipePluginName().equals(userSpecifiedConnectorName) + || IOTDB_SSL_SINK.getClassName().equals(userSpecifiedConnectorName) + || parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false); + trustStorePath = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY); trustStorePwd = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY); } @@ -184,7 +196,7 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector { ip, port, useSSL, - trustStore, + trustStorePath, trustStorePwd)); try { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java index f2063aa80e2..d0e4c6af6af 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java @@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.plugin.builtin; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBAirGapConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBLegacyPipeConnector; +import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBSslConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftAsyncConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSyncConnector; @@ -54,6 +55,7 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector", IoTDBThriftSyncConnector.class), IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", IoTDBThriftAsyncConnector.class), IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", IoTDBLegacyPipeConnector.class), + IOTDB_SSL_CONNECTOR("iotdb-ssl-connector", IoTDBSslConnector.class), IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapConnector.class), WEBSOCKET_CONNECTOR("websocket-connector", WebSocketConnector.class), OPC_UA_CONNECTOR("opc-ua-connector", OpcUaConnector.class), @@ -64,6 +66,7 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_SINK("iotdb-thrift-sync-sink", IoTDBThriftSyncConnector.class), IOTDB_THRIFT_ASYNC_SINK("iotdb-thrift-async-sink", IoTDBThriftAsyncConnector.class), IOTDB_LEGACY_PIPE_SINK("iotdb-legacy-pipe-sink", IoTDBLegacyPipeConnector.class), + IOTDB_SSL_SINK("iotdb-ssl-sink", IoTDBSslConnector.class), IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapConnector.class), WEBSOCKET_SINK("websocket-sink", WebSocketConnector.class), OPC_UA_SINK("opc-ua-sink", OpcUaConnector.class), @@ -106,6 +109,7 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(), IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(), + IOTDB_SSL_CONNECTOR.getPipePluginName().toUpperCase(), IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(), WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(), OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(), @@ -114,5 +118,7 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(), IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(), IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(), - WEBSOCKET_SINK.getPipePluginName().toUpperCase()))); + WEBSOCKET_SINK.getPipePluginName().toUpperCase(), + OPC_UA_SINK.getPipePluginName().toUpperCase(), + WRITE_BACK_SINK.getPipePluginName().toUpperCase()))); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSslConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSslConnector.java new file mode 100644 index 00000000000..00d8790185a --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSslConnector.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.plugin.builtin.connector; + +/** + * This class is a placeholder and should not be initialized. It represents the IoTDB SSL connector. + * There is a real implementation in the server module but cannot be imported here. The pipe agent + * in the server module will replace this class with the real implementation when initializing the + * IoTDB SSL connector. + */ +public class IoTDBSslConnector extends PlaceholderConnector {}
