This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 795d420db86 Pipe: rename iotdb-thrift-ssl-sink & allow not specifying 
connector in kv attrs & support kv attrs that don't have sink. / processor. / 
sink. prefixes (#11715)
795d420db86 is described below

commit 795d420db867d1a3a2f37155de09a62ea8bec83d
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Dec 14 18:15:19 2023 +0800

    Pipe: rename iotdb-thrift-ssl-sink & allow not specifying connector in kv 
attrs & support kv attrs that don't have sink. / processor. / sink. prefixes 
(#11715)
---
 .../api/customizer/parameter/PipeParameters.java   | 104 ++++++++++++++-------
 .../persistence/pipe/PipePluginInfo.java           |  13 +--
 .../agent/plugin/PipeConnectorConstructor.java     |  13 +--
 .../thrift/sync/IoTDBThriftSyncConnector.java      |  42 +++++++--
 .../config/executor/ClusterConfigTaskExecutor.java |   1 +
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |   8 +-
 .../builtin/connector/IoTDBThriftSslConnector.java |  28 ++++++
 7 files changed, 145 insertions(+), 64 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index c896825f342..89ebaa6bcb7 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -20,21 +20,26 @@
 package org.apache.iotdb.pipe.api.customizer.parameter;
 
 import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.PipeExtractor;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
- * Used in {@link PipeProcessor#customize(PipeParameters, 
PipeProcessorRuntimeConfiguration)} and
- * {@link PipeConnector#customize(PipeParameters, 
PipeConnectorRuntimeConfiguration)}.
+ * Used in {@link PipeExtractor#customize(PipeParameters, 
PipeExtractorRuntimeConfiguration)} ,
+ * {@link PipeProcessor#customize(PipeParameters, 
PipeProcessorRuntimeConfiguration)} and {@link
+ * PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)}.
  *
- * <p>This class is used to parse the parameters in WITH PROCESSOR and WITH 
CONNECTOR when creating
- * a pipe.
+ * <p>This class is used to parse the parameters in WITH SOURCE, WITH 
PROCESSOR and WITH SINK when
+ * creating a pipe.
  *
- * <p>The input parameters is the key-value pair attributes for customization.
+ * <p>The input parameters are the key-value pair attributes for customization.
  */
 public class PipeParameters {
 
@@ -49,12 +54,12 @@ public class PipeParameters {
   }
 
   public boolean hasAttribute(String key) {
-    return attributes.containsKey(key);
+    return attributes.containsKey(key) || 
attributes.containsKey(KeyReducer.reduce(key));
   }
 
   public boolean hasAnyAttributes(String... keys) {
     for (final String key : keys) {
-      if (attributes.containsKey(key)) {
+      if (hasAttribute(key)) {
         return true;
       }
     }
@@ -62,37 +67,38 @@ public class PipeParameters {
   }
 
   public String getString(String key) {
-    return attributes.get(key);
+    final String value = attributes.get(key);
+    return value != null ? value : attributes.get(KeyReducer.reduce(key));
   }
 
   public Boolean getBoolean(String key) {
-    String value = attributes.get(key);
+    final String value = getString(key);
     return value == null ? null : Boolean.parseBoolean(value);
   }
 
   public Integer getInt(String key) {
-    String value = attributes.get(key);
+    final String value = getString(key);
     return value == null ? null : Integer.parseInt(value);
   }
 
   public Long getLong(String key) {
-    String value = attributes.get(key);
+    final String value = getString(key);
     return value == null ? null : Long.parseLong(value);
   }
 
   public Float getFloat(String key) {
-    String value = attributes.get(key);
+    final String value = getString(key);
     return value == null ? null : Float.parseFloat(value);
   }
 
   public Double getDouble(String key) {
-    String value = attributes.get(key);
+    final String value = getString(key);
     return value == null ? null : Double.parseDouble(value);
   }
 
   public String getStringByKeys(String... keys) {
     for (final String key : keys) {
-      final String value = attributes.get(key);
+      final String value = getString(key);
       if (value != null) {
         return value;
       }
@@ -102,9 +108,9 @@ public class PipeParameters {
 
   public Boolean getBooleanByKeys(String... keys) {
     for (final String key : keys) {
-      final String value = attributes.get(key);
+      final Boolean value = getBoolean(key);
       if (value != null) {
-        return Boolean.parseBoolean(value);
+        return value;
       }
     }
     return null;
@@ -112,9 +118,9 @@ public class PipeParameters {
 
   public Integer getIntByKeys(String... keys) {
     for (final String key : keys) {
-      final String value = attributes.get(key);
+      final Integer value = getInt(key);
       if (value != null) {
-        return Integer.parseInt(value);
+        return value;
       }
     }
     return null;
@@ -122,9 +128,9 @@ public class PipeParameters {
 
   public Long getLongByKeys(String... keys) {
     for (final String key : keys) {
-      final String value = attributes.get(key);
+      final Long value = getLong(key);
       if (value != null) {
-        return Long.parseLong(value);
+        return value;
       }
     }
     return null;
@@ -132,9 +138,9 @@ public class PipeParameters {
 
   public Float getFloatByKeys(String... keys) {
     for (final String key : keys) {
-      final String value = attributes.get(key);
+      final Float value = getFloat(key);
       if (value != null) {
-        return Float.parseFloat(value);
+        return value;
       }
     }
     return null;
@@ -142,47 +148,47 @@ public class PipeParameters {
 
   public Double getDoubleByKeys(String... keys) {
     for (final String key : keys) {
-      final String value = attributes.get(key);
+      final Double value = getDouble(key);
       if (value != null) {
-        return Double.parseDouble(value);
+        return value;
       }
     }
     return null;
   }
 
   public String getStringOrDefault(String key, String defaultValue) {
-    String value = attributes.get(key);
+    final String value = getString(key);
     return value == null ? defaultValue : value;
   }
 
   public boolean getBooleanOrDefault(String key, boolean defaultValue) {
-    String value = attributes.get(key);
+    final String value = getString(key);
     return value == null ? defaultValue : Boolean.parseBoolean(value);
   }
 
   public int getIntOrDefault(String key, int defaultValue) {
-    String value = attributes.get(key);
+    final String value = getString(key);
     return value == null ? defaultValue : Integer.parseInt(value);
   }
 
   public long getLongOrDefault(String key, long defaultValue) {
-    String value = attributes.get(key);
+    final String value = getString(key);
     return value == null ? defaultValue : Long.parseLong(value);
   }
 
   public float getFloatOrDefault(String key, float defaultValue) {
-    String value = attributes.get(key);
+    final String value = getString(key);
     return value == null ? defaultValue : Float.parseFloat(value);
   }
 
   public double getDoubleOrDefault(String key, double defaultValue) {
-    String value = attributes.get(key);
+    final String value = getString(key);
     return value == null ? defaultValue : Double.parseDouble(value);
   }
 
   public String getStringOrDefault(List<String> keys, String defaultValue) {
     for (final String key : keys) {
-      final String value = attributes.get(key);
+      final String value = getString(key);
       if (value != null) {
         return value;
       }
@@ -192,7 +198,7 @@ public class PipeParameters {
 
   public boolean getBooleanOrDefault(List<String> keys, boolean defaultValue) {
     for (final String key : keys) {
-      final String value = attributes.get(key);
+      final String value = getString(key);
       if (value != null) {
         return Boolean.parseBoolean(value);
       }
@@ -202,7 +208,7 @@ public class PipeParameters {
 
   public int getIntOrDefault(List<String> keys, int defaultValue) {
     for (final String key : keys) {
-      final String value = attributes.get(key);
+      final String value = getString(key);
       if (value != null) {
         return Integer.parseInt(value);
       }
@@ -212,7 +218,7 @@ public class PipeParameters {
 
   public long getLongOrDefault(List<String> keys, long defaultValue) {
     for (final String key : keys) {
-      final String value = attributes.get(key);
+      final String value = getString(key);
       if (value != null) {
         return Long.parseLong(value);
       }
@@ -222,7 +228,7 @@ public class PipeParameters {
 
   public float getFloatOrDefault(List<String> keys, float defaultValue) {
     for (final String key : keys) {
-      final String value = attributes.get(key);
+      final String value = getString(key);
       if (value != null) {
         return Float.parseFloat(value);
       }
@@ -232,7 +238,7 @@ public class PipeParameters {
 
   public double getDoubleOrDefault(List<String> keys, double defaultValue) {
     for (final String key : keys) {
-      final String value = attributes.get(key);
+      final String value = getString(key);
       if (value != null) {
         return Double.parseDouble(value);
       }
@@ -261,4 +267,30 @@ public class PipeParameters {
   public String toString() {
     return attributes.toString();
   }
+
+  private static class KeyReducer {
+
+    private static final Set<String> PREFIXES = new HashSet<>();
+
+    static {
+      PREFIXES.add("extractor.");
+      PREFIXES.add("source.");
+      PREFIXES.add("processor.");
+      PREFIXES.add("connector.");
+      PREFIXES.add("sink.");
+    }
+
+    static String reduce(String key) {
+      if (key == null) {
+        return null;
+      }
+      final String lowerCaseKey = key.toLowerCase();
+      for (final String prefix : PREFIXES) {
+        if (lowerCaseKey.startsWith(prefix)) {
+          return key.substring(prefix.length());
+        }
+      }
+      return key;
+    }
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index e4cde84ae6b..a5b9baaa877 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -58,6 +58,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.DO_NOTHING_PROCESSOR;
 import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_EXTRACTOR;
+import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR;
 
 public class PipePluginInfo implements SnapshotProcessor {
 
@@ -153,16 +154,10 @@ public class PipePluginInfo implements SnapshotProcessor {
 
     final PipeParameters connectorParameters =
         new PipeParameters(createPipeRequest.getConnectorAttributes());
-    if (!connectorParameters.hasAnyAttributes(
-        PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY)) {
-      final String exceptionMessage =
-          "Failed to create pipe, the pipe connector plugin is not specified";
-      LOGGER.warn(exceptionMessage);
-      throw new PipeException(exceptionMessage);
-    }
     final String connectorPluginName =
-        connectorParameters.getStringByKeys(
-            PipeConnectorConstant.CONNECTOR_KEY, 
PipeConnectorConstant.SINK_KEY);
+        connectorParameters.getStringOrDefault(
+            Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY, 
PipeConnectorConstant.SINK_KEY),
+            IOTDB_THRIFT_CONNECTOR.getPipePluginName());
     if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
       final String exceptionMessage =
           String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java
index 0137f1c3c7e..37da4da1272 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java
@@ -32,7 +32,6 @@ import 
org.apache.iotdb.db.pipe.connector.protocol.websocket.WebSocketConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.writeback.WriteBackConnector;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import java.util.Arrays;
 
@@ -47,6 +46,9 @@ public class PipeConnectorConstructor extends 
PipePluginConstructor {
     PLUGIN_CONSTRUCTORS.put(
         BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName(),
         IoTDBThriftAsyncConnector::new);
+    PLUGIN_CONSTRUCTORS.put(
+        BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName(),
+        IoTDBThriftSyncConnector::new);
     PLUGIN_CONSTRUCTORS.put(
         BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName(),
         IoTDBThriftSyncConnector::new);
@@ -69,6 +71,8 @@ public class PipeConnectorConstructor extends 
PipePluginConstructor {
 
     PLUGIN_CONSTRUCTORS.put(
         BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName(), 
IoTDBThriftAsyncConnector::new);
+    PLUGIN_CONSTRUCTORS.put(
+        BuiltinPipePlugin.IOTDB_THRIFT_SSL_SINK.getPipePluginName(), 
IoTDBThriftSyncConnector::new);
     PLUGIN_CONSTRUCTORS.put(
         BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(),
         IoTDBThriftSyncConnector::new);
@@ -91,13 +95,6 @@ public class PipeConnectorConstructor extends 
PipePluginConstructor {
 
   @Override
   PipeConnector reflectPlugin(PipeParameters connectorParameters) {
-    if (!connectorParameters.hasAnyAttributes(
-        PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY)) {
-      throw new PipeException(
-          "Failed to reflect PipeConnector instance because "
-              + "'connector' is not specified in the parameters.");
-    }
-
     return (PipeConnector)
         reflectPluginByKey(
             connectorParameters
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index f4a538f1b92..21d0eb82c2a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -53,6 +53,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,9 +68,14 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR;
+import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR;
+import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_SSL_SINK;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_KEY;
 
 public class IoTDBThriftSyncConnector extends IoTDBConnector {
 
@@ -81,7 +87,7 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector {
   private final List<Boolean> isClientAlive = new ArrayList<>();
 
   private boolean useSSL;
-  private String trustStore;
+  private String trustStorePath;
   private String trustStorePwd;
 
   private long currentClientIndex = 0;
@@ -98,7 +104,14 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
 
     final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
     final PipeParameters parameters = validator.getParameters();
-    Set<TEndPoint> givenNodeUrls = parseNodeUrls(parameters);
+
+    final String userSpecifiedConnectorName =
+        parameters
+            .getStringOrDefault(
+                ImmutableList.of(CONNECTOR_KEY, SINK_KEY),
+                IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+            .toLowerCase();
+    final Set<TEndPoint> givenNodeUrls = parseNodeUrls(parameters);
 
     validator
         .validate(
@@ -122,11 +135,11 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
         .validate(
             args -> !((boolean) args[0]) || ((boolean) args[1] && (boolean) 
args[2]),
             String.format(
-                "When %s is specified to true, %s and %s must be specified",
-                SINK_IOTDB_SSL_ENABLE_KEY,
-                SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY,
-                SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY),
-            parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false),
+                "When ssl transport is enabled, %s and %s must be specified",
+                SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY, 
SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY),
+            
IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName().equals(userSpecifiedConnectorName)
+                || 
IOTDB_THRIFT_SSL_SINK.getPipePluginName().equals(userSpecifiedConnectorName)
+                || parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, 
false),
             parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY),
             parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY));
   }
@@ -145,8 +158,17 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
       tabletBatchBuilder = new 
IoTDBThriftSyncPipeTransferBatchReqBuilder(parameters);
     }
 
-    useSSL = parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false);
-    trustStore = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY);
+    final String userSpecifiedConnectorName =
+        parameters
+            .getStringOrDefault(
+                ImmutableList.of(CONNECTOR_KEY, SINK_KEY),
+                IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+            .toLowerCase();
+    useSSL =
+        
IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName().equals(userSpecifiedConnectorName)
+            || 
IOTDB_THRIFT_SSL_SINK.getPipePluginName().equals(userSpecifiedConnectorName)
+            || parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, 
false);
+    trustStorePath = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY);
     trustStorePwd = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY);
   }
 
@@ -184,7 +206,7 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
               ip,
               port,
               useSSL,
-              trustStore,
+              trustStorePath,
               trustStorePwd));
       try {
         final TPipeTransferResp resp =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index b6c7a18db72..553564fdc12 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1551,6 +1551,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     try {
       PipeAgent.plugin().validate(createPipeStatement);
     } catch (Exception e) {
+      LOGGER.info("Failed to validate pipe statement, because {}", 
e.getMessage(), e);
       future.setException(
           new IoTDBException(e.getMessage(), 
TSStatusCode.PIPE_ERROR.getStatusCode()));
       return future;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index f2063aa80e2..cf7f5d7b4df 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBAirGapConnect
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBLegacyPipeConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftAsyncConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
+import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSslConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSyncConnector;
 import org.apache.iotdb.commons.pipe.plugin.builtin.connector.OpcUaConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.WebSocketConnector;
@@ -51,6 +52,7 @@ public enum BuiltinPipePlugin {
   // connectors
   DO_NOTHING_CONNECTOR("do-nothing-connector", DoNothingConnector.class),
   IOTDB_THRIFT_CONNECTOR("iotdb-thrift-connector", IoTDBThriftConnector.class),
+  IOTDB_THRIFT_SSL_CONNECTOR("iotdb-thrift-ssl-connector", 
IoTDBThriftSslConnector.class),
   IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector", 
IoTDBThriftSyncConnector.class),
   IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", 
IoTDBThriftAsyncConnector.class),
   IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", 
IoTDBLegacyPipeConnector.class),
@@ -61,6 +63,7 @@ public enum BuiltinPipePlugin {
 
   DO_NOTHING_SINK("do-nothing-sink", DoNothingConnector.class),
   IOTDB_THRIFT_SINK("iotdb-thrift-sink", IoTDBThriftConnector.class),
+  IOTDB_THRIFT_SSL_SINK("iotdb-thrift-ssl-sink", 
IoTDBThriftSslConnector.class),
   IOTDB_THRIFT_SYNC_SINK("iotdb-thrift-sync-sink", 
IoTDBThriftSyncConnector.class),
   IOTDB_THRIFT_ASYNC_SINK("iotdb-thrift-async-sink", 
IoTDBThriftAsyncConnector.class),
   IOTDB_LEGACY_PIPE_SINK("iotdb-legacy-pipe-sink", 
IoTDBLegacyPipeConnector.class),
@@ -103,6 +106,7 @@ public enum BuiltinPipePlugin {
                   // Connectors
                   DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(),
                   IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase(),
+                  IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName().toUpperCase(),
                   
IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(),
                   
IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
                   
IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(),
@@ -114,5 +118,7 @@ public enum BuiltinPipePlugin {
                   IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(),
                   IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(),
                   IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(),
-                  WEBSOCKET_SINK.getPipePluginName().toUpperCase())));
+                  WEBSOCKET_SINK.getPipePluginName().toUpperCase(),
+                  OPC_UA_SINK.getPipePluginName().toUpperCase(),
+                  WRITE_BACK_SINK.getPipePluginName().toUpperCase())));
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftSslConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftSslConnector.java
new file mode 100644
index 00000000000..f8416746449
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftSslConnector.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents 
the IoTDB SSL connector.
+ * There is a real implementation in the server module but cannot be imported 
here. The pipe agent
+ * in the server module will replace this class with the real implementation 
when initializing the
+ * IoTDB SSL connector.
+ */
+public class IoTDBThriftSslConnector extends PlaceholderConnector {}

Reply via email to