This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-ssl-refactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b72075ec006f0ef5fe5cc7f291128f6d4ce1f54b
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Dec 13 16:32:46 2023 +0800

    Pipe: iotdb-ssl-sink
---
 .../agent/plugin/PipeConnectorConstructor.java     |  4 +++
 .../thrift/sync/IoTDBThriftSyncConnector.java      | 32 +++++++++++++++-------
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |  8 +++++-
 .../builtin/connector/IoTDBSslConnector.java       | 28 +++++++++++++++++++
 4 files changed, 61 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java
index 0137f1c3c7e..6ab65c67282 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java
@@ -56,6 +56,8 @@ public class PipeConnectorConstructor extends 
PipePluginConstructor {
     PLUGIN_CONSTRUCTORS.put(
         BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
         IoTDBLegacyPipeConnector::new);
+    PLUGIN_CONSTRUCTORS.put(
+        BuiltinPipePlugin.IOTDB_SSL_CONNECTOR.getPipePluginName(), 
IoTDBThriftSyncConnector::new);
     PLUGIN_CONSTRUCTORS.put(
         BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), 
IoTDBAirGapConnector::new);
     PLUGIN_CONSTRUCTORS.put(
@@ -78,6 +80,8 @@ public class PipeConnectorConstructor extends 
PipePluginConstructor {
     PLUGIN_CONSTRUCTORS.put(
         BuiltinPipePlugin.IOTDB_LEGACY_PIPE_SINK.getPipePluginName(),
         IoTDBLegacyPipeConnector::new);
+    PLUGIN_CONSTRUCTORS.put(
+        BuiltinPipePlugin.IOTDB_SSL_SINK.getPipePluginName(), 
IoTDBThriftSyncConnector::new);
     PLUGIN_CONSTRUCTORS.put(
         BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), 
IoTDBAirGapConnector::new);
     PLUGIN_CONSTRUCTORS.put(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index 2f50a7081b0..c85d2cfa6f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -67,9 +67,13 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_SSL_CONNECTOR;
+import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_SSL_SINK;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_KEY;
 
 public class IoTDBThriftSyncConnector extends IoTDBConnector {
 
@@ -81,7 +85,7 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector {
   private final List<Boolean> isClientAlive = new ArrayList<>();
 
   private boolean useSSL;
-  private String trustStore;
+  private String trustStorePath;
   private String trustStorePwd;
 
   private long currentClientIndex = 0;
@@ -98,7 +102,10 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
 
     final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
     final PipeParameters parameters = validator.getParameters();
-    Set<TEndPoint> givenNodeUrls = parseNodeUrls(parameters);
+
+    final String userSpecifiedConnectorName =
+        parameters.getStringByKeys(CONNECTOR_KEY, SINK_KEY).toLowerCase();
+    final Set<TEndPoint> givenNodeUrls = parseNodeUrls(parameters);
 
     validator
         .validate(
@@ -122,11 +129,11 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
         .validate(
             args -> !((boolean) args[0]) || ((boolean) args[1] && (boolean) 
args[2]),
             String.format(
-                "When %s is specified to true, %s and %s must be specified",
-                SINK_IOTDB_SSL_ENABLE_KEY,
-                SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY,
-                SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY),
-            parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false),
+                "When ssl transport is enabled, %s and %s must be specified",
+                SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY, 
SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY),
+            
IOTDB_SSL_CONNECTOR.getPipePluginName().equals(userSpecifiedConnectorName)
+                || 
IOTDB_SSL_SINK.getClassName().equals(userSpecifiedConnectorName)
+                || parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, 
false),
             parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY),
             parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY));
   }
@@ -145,8 +152,13 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
       tabletBatchBuilder = new 
IoTDBThriftSyncPipeTransferBatchReqBuilder(parameters);
     }
 
-    useSSL = parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false);
-    trustStore = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY);
+    final String userSpecifiedConnectorName =
+        parameters.getStringByKeys(CONNECTOR_KEY, SINK_KEY).toLowerCase();
+    useSSL =
+        
IOTDB_SSL_CONNECTOR.getPipePluginName().equals(userSpecifiedConnectorName)
+            || IOTDB_SSL_SINK.getClassName().equals(userSpecifiedConnectorName)
+            || parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, 
false);
+    trustStorePath = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY);
     trustStorePwd = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY);
   }
 
@@ -184,7 +196,7 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
               ip,
               port,
               useSSL,
-              trustStore,
+              trustStorePath,
               trustStorePwd));
 
       try {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index f2063aa80e2..d0e4c6af6af 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.plugin.builtin;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBAirGapConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBLegacyPipeConnector;
+import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBSslConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftAsyncConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSyncConnector;
@@ -54,6 +55,7 @@ public enum BuiltinPipePlugin {
   IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector", 
IoTDBThriftSyncConnector.class),
   IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", 
IoTDBThriftAsyncConnector.class),
   IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", 
IoTDBLegacyPipeConnector.class),
+  IOTDB_SSL_CONNECTOR("iotdb-ssl-connector", IoTDBSslConnector.class),
   IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", 
IoTDBAirGapConnector.class),
   WEBSOCKET_CONNECTOR("websocket-connector", WebSocketConnector.class),
   OPC_UA_CONNECTOR("opc-ua-connector", OpcUaConnector.class),
@@ -64,6 +66,7 @@ public enum BuiltinPipePlugin {
   IOTDB_THRIFT_SYNC_SINK("iotdb-thrift-sync-sink", 
IoTDBThriftSyncConnector.class),
   IOTDB_THRIFT_ASYNC_SINK("iotdb-thrift-async-sink", 
IoTDBThriftAsyncConnector.class),
   IOTDB_LEGACY_PIPE_SINK("iotdb-legacy-pipe-sink", 
IoTDBLegacyPipeConnector.class),
+  IOTDB_SSL_SINK("iotdb-ssl-sink", IoTDBSslConnector.class),
   IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapConnector.class),
   WEBSOCKET_SINK("websocket-sink", WebSocketConnector.class),
   OPC_UA_SINK("opc-ua-sink", OpcUaConnector.class),
@@ -106,6 +109,7 @@ public enum BuiltinPipePlugin {
                   
IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(),
                   
IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
                   
IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(),
+                  IOTDB_SSL_CONNECTOR.getPipePluginName().toUpperCase(),
                   IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(),
                   WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(),
                   OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(),
@@ -114,5 +118,7 @@ public enum BuiltinPipePlugin {
                   IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(),
                   IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(),
                   IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(),
-                  WEBSOCKET_SINK.getPipePluginName().toUpperCase())));
+                  WEBSOCKET_SINK.getPipePluginName().toUpperCase(),
+                  OPC_UA_SINK.getPipePluginName().toUpperCase(),
+                  WRITE_BACK_SINK.getPipePluginName().toUpperCase())));
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSslConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSslConnector.java
new file mode 100644
index 00000000000..00d8790185a
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSslConnector.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents 
the IoTDB SSL connector.
+ * There is a real implementation in the server module but cannot be imported 
here. The pipe agent
+ * in the server module will replace this class with the real implementation 
when initializing the
+ * IoTDB SSL connector.
+ */
+public class IoTDBSslConnector extends PlaceholderConnector {}

Reply via email to