This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 795d420db86 Pipe: rename iotdb-thrift-ssl-sink & allow not specifying
connector in kv attrs & support kv attrs that don't have sink. / processor. /
sink. prefixes (#11715)
795d420db86 is described below
commit 795d420db867d1a3a2f37155de09a62ea8bec83d
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Dec 14 18:15:19 2023 +0800
Pipe: rename iotdb-thrift-ssl-sink & allow not specifying connector in kv
attrs & support kv attrs that don't have sink. / processor. / sink. prefixes
(#11715)
---
.../api/customizer/parameter/PipeParameters.java | 104 ++++++++++++++-------
.../persistence/pipe/PipePluginInfo.java | 13 +--
.../agent/plugin/PipeConnectorConstructor.java | 13 +--
.../thrift/sync/IoTDBThriftSyncConnector.java | 42 +++++++--
.../config/executor/ClusterConfigTaskExecutor.java | 1 +
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 8 +-
.../builtin/connector/IoTDBThriftSslConnector.java | 28 ++++++
7 files changed, 145 insertions(+), 64 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index c896825f342..89ebaa6bcb7 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -20,21 +20,26 @@
package org.apache.iotdb.pipe.api.customizer.parameter;
import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.PipeExtractor;
import org.apache.iotdb.pipe.api.PipeProcessor;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
- * Used in {@link PipeProcessor#customize(PipeParameters,
PipeProcessorRuntimeConfiguration)} and
- * {@link PipeConnector#customize(PipeParameters,
PipeConnectorRuntimeConfiguration)}.
+ * Used in {@link PipeExtractor#customize(PipeParameters,
PipeExtractorRuntimeConfiguration)} ,
+ * {@link PipeProcessor#customize(PipeParameters,
PipeProcessorRuntimeConfiguration)} and {@link
+ * PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)}.
*
- * <p>This class is used to parse the parameters in WITH PROCESSOR and WITH
CONNECTOR when creating
- * a pipe.
+ * <p>This class is used to parse the parameters in WITH SOURCE, WITH
PROCESSOR and WITH SINK when
+ * creating a pipe.
*
- * <p>The input parameters is the key-value pair attributes for customization.
+ * <p>The input parameters are the key-value pair attributes for customization.
*/
public class PipeParameters {
@@ -49,12 +54,12 @@ public class PipeParameters {
}
public boolean hasAttribute(String key) {
- return attributes.containsKey(key);
+ return attributes.containsKey(key) ||
attributes.containsKey(KeyReducer.reduce(key));
}
public boolean hasAnyAttributes(String... keys) {
for (final String key : keys) {
- if (attributes.containsKey(key)) {
+ if (hasAttribute(key)) {
return true;
}
}
@@ -62,37 +67,38 @@ public class PipeParameters {
}
public String getString(String key) {
- return attributes.get(key);
+ final String value = attributes.get(key);
+ return value != null ? value : attributes.get(KeyReducer.reduce(key));
}
public Boolean getBoolean(String key) {
- String value = attributes.get(key);
+ final String value = getString(key);
return value == null ? null : Boolean.parseBoolean(value);
}
public Integer getInt(String key) {
- String value = attributes.get(key);
+ final String value = getString(key);
return value == null ? null : Integer.parseInt(value);
}
public Long getLong(String key) {
- String value = attributes.get(key);
+ final String value = getString(key);
return value == null ? null : Long.parseLong(value);
}
public Float getFloat(String key) {
- String value = attributes.get(key);
+ final String value = getString(key);
return value == null ? null : Float.parseFloat(value);
}
public Double getDouble(String key) {
- String value = attributes.get(key);
+ final String value = getString(key);
return value == null ? null : Double.parseDouble(value);
}
public String getStringByKeys(String... keys) {
for (final String key : keys) {
- final String value = attributes.get(key);
+ final String value = getString(key);
if (value != null) {
return value;
}
@@ -102,9 +108,9 @@ public class PipeParameters {
public Boolean getBooleanByKeys(String... keys) {
for (final String key : keys) {
- final String value = attributes.get(key);
+ final Boolean value = getBoolean(key);
if (value != null) {
- return Boolean.parseBoolean(value);
+ return value;
}
}
return null;
@@ -112,9 +118,9 @@ public class PipeParameters {
public Integer getIntByKeys(String... keys) {
for (final String key : keys) {
- final String value = attributes.get(key);
+ final Integer value = getInt(key);
if (value != null) {
- return Integer.parseInt(value);
+ return value;
}
}
return null;
@@ -122,9 +128,9 @@ public class PipeParameters {
public Long getLongByKeys(String... keys) {
for (final String key : keys) {
- final String value = attributes.get(key);
+ final Long value = getLong(key);
if (value != null) {
- return Long.parseLong(value);
+ return value;
}
}
return null;
@@ -132,9 +138,9 @@ public class PipeParameters {
public Float getFloatByKeys(String... keys) {
for (final String key : keys) {
- final String value = attributes.get(key);
+ final Float value = getFloat(key);
if (value != null) {
- return Float.parseFloat(value);
+ return value;
}
}
return null;
@@ -142,47 +148,47 @@ public class PipeParameters {
public Double getDoubleByKeys(String... keys) {
for (final String key : keys) {
- final String value = attributes.get(key);
+ final Double value = getDouble(key);
if (value != null) {
- return Double.parseDouble(value);
+ return value;
}
}
return null;
}
public String getStringOrDefault(String key, String defaultValue) {
- String value = attributes.get(key);
+ final String value = getString(key);
return value == null ? defaultValue : value;
}
public boolean getBooleanOrDefault(String key, boolean defaultValue) {
- String value = attributes.get(key);
+ final String value = getString(key);
return value == null ? defaultValue : Boolean.parseBoolean(value);
}
public int getIntOrDefault(String key, int defaultValue) {
- String value = attributes.get(key);
+ final String value = getString(key);
return value == null ? defaultValue : Integer.parseInt(value);
}
public long getLongOrDefault(String key, long defaultValue) {
- String value = attributes.get(key);
+ final String value = getString(key);
return value == null ? defaultValue : Long.parseLong(value);
}
public float getFloatOrDefault(String key, float defaultValue) {
- String value = attributes.get(key);
+ final String value = getString(key);
return value == null ? defaultValue : Float.parseFloat(value);
}
public double getDoubleOrDefault(String key, double defaultValue) {
- String value = attributes.get(key);
+ final String value = getString(key);
return value == null ? defaultValue : Double.parseDouble(value);
}
public String getStringOrDefault(List<String> keys, String defaultValue) {
for (final String key : keys) {
- final String value = attributes.get(key);
+ final String value = getString(key);
if (value != null) {
return value;
}
@@ -192,7 +198,7 @@ public class PipeParameters {
public boolean getBooleanOrDefault(List<String> keys, boolean defaultValue) {
for (final String key : keys) {
- final String value = attributes.get(key);
+ final String value = getString(key);
if (value != null) {
return Boolean.parseBoolean(value);
}
@@ -202,7 +208,7 @@ public class PipeParameters {
public int getIntOrDefault(List<String> keys, int defaultValue) {
for (final String key : keys) {
- final String value = attributes.get(key);
+ final String value = getString(key);
if (value != null) {
return Integer.parseInt(value);
}
@@ -212,7 +218,7 @@ public class PipeParameters {
public long getLongOrDefault(List<String> keys, long defaultValue) {
for (final String key : keys) {
- final String value = attributes.get(key);
+ final String value = getString(key);
if (value != null) {
return Long.parseLong(value);
}
@@ -222,7 +228,7 @@ public class PipeParameters {
public float getFloatOrDefault(List<String> keys, float defaultValue) {
for (final String key : keys) {
- final String value = attributes.get(key);
+ final String value = getString(key);
if (value != null) {
return Float.parseFloat(value);
}
@@ -232,7 +238,7 @@ public class PipeParameters {
public double getDoubleOrDefault(List<String> keys, double defaultValue) {
for (final String key : keys) {
- final String value = attributes.get(key);
+ final String value = getString(key);
if (value != null) {
return Double.parseDouble(value);
}
@@ -261,4 +267,30 @@ public class PipeParameters {
public String toString() {
return attributes.toString();
}
+
+ private static class KeyReducer {
+
+ private static final Set<String> PREFIXES = new HashSet<>();
+
+ static {
+ PREFIXES.add("extractor.");
+ PREFIXES.add("source.");
+ PREFIXES.add("processor.");
+ PREFIXES.add("connector.");
+ PREFIXES.add("sink.");
+ }
+
+ static String reduce(String key) {
+ if (key == null) {
+ return null;
+ }
+ final String lowerCaseKey = key.toLowerCase();
+ for (final String prefix : PREFIXES) {
+ if (lowerCaseKey.startsWith(prefix)) {
+ return key.substring(prefix.length());
+ }
+ }
+ return key;
+ }
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index e4cde84ae6b..a5b9baaa877 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -58,6 +58,7 @@ import java.util.concurrent.locks.ReentrantLock;
import static
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.DO_NOTHING_PROCESSOR;
import static
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_EXTRACTOR;
+import static
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR;
public class PipePluginInfo implements SnapshotProcessor {
@@ -153,16 +154,10 @@ public class PipePluginInfo implements SnapshotProcessor {
final PipeParameters connectorParameters =
new PipeParameters(createPipeRequest.getConnectorAttributes());
- if (!connectorParameters.hasAnyAttributes(
- PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY)) {
- final String exceptionMessage =
- "Failed to create pipe, the pipe connector plugin is not specified";
- LOGGER.warn(exceptionMessage);
- throw new PipeException(exceptionMessage);
- }
final String connectorPluginName =
- connectorParameters.getStringByKeys(
- PipeConnectorConstant.CONNECTOR_KEY,
PipeConnectorConstant.SINK_KEY);
+ connectorParameters.getStringOrDefault(
+ Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY,
PipeConnectorConstant.SINK_KEY),
+ IOTDB_THRIFT_CONNECTOR.getPipePluginName());
if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
final String exceptionMessage =
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java
index 0137f1c3c7e..37da4da1272 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeConnectorConstructor.java
@@ -32,7 +32,6 @@ import
org.apache.iotdb.db.pipe.connector.protocol.websocket.WebSocketConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.writeback.WriteBackConnector;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.exception.PipeException;
import java.util.Arrays;
@@ -47,6 +46,9 @@ public class PipeConnectorConstructor extends
PipePluginConstructor {
PLUGIN_CONSTRUCTORS.put(
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName(),
IoTDBThriftAsyncConnector::new);
+ PLUGIN_CONSTRUCTORS.put(
+ BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName(),
+ IoTDBThriftSyncConnector::new);
PLUGIN_CONSTRUCTORS.put(
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName(),
IoTDBThriftSyncConnector::new);
@@ -69,6 +71,8 @@ public class PipeConnectorConstructor extends
PipePluginConstructor {
PLUGIN_CONSTRUCTORS.put(
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName(),
IoTDBThriftAsyncConnector::new);
+ PLUGIN_CONSTRUCTORS.put(
+ BuiltinPipePlugin.IOTDB_THRIFT_SSL_SINK.getPipePluginName(),
IoTDBThriftSyncConnector::new);
PLUGIN_CONSTRUCTORS.put(
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(),
IoTDBThriftSyncConnector::new);
@@ -91,13 +95,6 @@ public class PipeConnectorConstructor extends
PipePluginConstructor {
@Override
PipeConnector reflectPlugin(PipeParameters connectorParameters) {
- if (!connectorParameters.hasAnyAttributes(
- PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY)) {
- throw new PipeException(
- "Failed to reflect PipeConnector instance because "
- + "'connector' is not specified in the parameters.");
- }
-
return (PipeConnector)
reflectPluginByKey(
connectorParameters
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index f4a538f1b92..21d0eb82c2a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -53,6 +53,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import com.google.common.collect.ImmutableList;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,9 +68,14 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR;
+import static
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR;
+import static
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_SSL_SINK;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_KEY;
public class IoTDBThriftSyncConnector extends IoTDBConnector {
@@ -81,7 +87,7 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector {
private final List<Boolean> isClientAlive = new ArrayList<>();
private boolean useSSL;
- private String trustStore;
+ private String trustStorePath;
private String trustStorePwd;
private long currentClientIndex = 0;
@@ -98,7 +104,14 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
final PipeParameters parameters = validator.getParameters();
- Set<TEndPoint> givenNodeUrls = parseNodeUrls(parameters);
+
+ final String userSpecifiedConnectorName =
+ parameters
+ .getStringOrDefault(
+ ImmutableList.of(CONNECTOR_KEY, SINK_KEY),
+ IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+ .toLowerCase();
+ final Set<TEndPoint> givenNodeUrls = parseNodeUrls(parameters);
validator
.validate(
@@ -122,11 +135,11 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
.validate(
args -> !((boolean) args[0]) || ((boolean) args[1] && (boolean)
args[2]),
String.format(
- "When %s is specified to true, %s and %s must be specified",
- SINK_IOTDB_SSL_ENABLE_KEY,
- SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY,
- SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY),
- parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false),
+ "When ssl transport is enabled, %s and %s must be specified",
+ SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY,
SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY),
+
IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName().equals(userSpecifiedConnectorName)
+ ||
IOTDB_THRIFT_SSL_SINK.getPipePluginName().equals(userSpecifiedConnectorName)
+ || parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY,
false),
parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY),
parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY));
}
@@ -145,8 +158,17 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
tabletBatchBuilder = new
IoTDBThriftSyncPipeTransferBatchReqBuilder(parameters);
}
- useSSL = parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false);
- trustStore = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY);
+ final String userSpecifiedConnectorName =
+ parameters
+ .getStringOrDefault(
+ ImmutableList.of(CONNECTOR_KEY, SINK_KEY),
+ IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+ .toLowerCase();
+ useSSL =
+
IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName().equals(userSpecifiedConnectorName)
+ ||
IOTDB_THRIFT_SSL_SINK.getPipePluginName().equals(userSpecifiedConnectorName)
+ || parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY,
false);
+ trustStorePath = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY);
trustStorePwd = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY);
}
@@ -184,7 +206,7 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
ip,
port,
useSSL,
- trustStore,
+ trustStorePath,
trustStorePwd));
try {
final TPipeTransferResp resp =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index b6c7a18db72..553564fdc12 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1551,6 +1551,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
try {
PipeAgent.plugin().validate(createPipeStatement);
} catch (Exception e) {
+ LOGGER.info("Failed to validate pipe statement, because {}",
e.getMessage(), e);
future.setException(
new IoTDBException(e.getMessage(),
TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index f2063aa80e2..cf7f5d7b4df 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBAirGapConnect
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBLegacyPipeConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftAsyncConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
+import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSslConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSyncConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.OpcUaConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.WebSocketConnector;
@@ -51,6 +52,7 @@ public enum BuiltinPipePlugin {
// connectors
DO_NOTHING_CONNECTOR("do-nothing-connector", DoNothingConnector.class),
IOTDB_THRIFT_CONNECTOR("iotdb-thrift-connector", IoTDBThriftConnector.class),
+ IOTDB_THRIFT_SSL_CONNECTOR("iotdb-thrift-ssl-connector",
IoTDBThriftSslConnector.class),
IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector",
IoTDBThriftSyncConnector.class),
IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector",
IoTDBThriftAsyncConnector.class),
IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector",
IoTDBLegacyPipeConnector.class),
@@ -61,6 +63,7 @@ public enum BuiltinPipePlugin {
DO_NOTHING_SINK("do-nothing-sink", DoNothingConnector.class),
IOTDB_THRIFT_SINK("iotdb-thrift-sink", IoTDBThriftConnector.class),
+ IOTDB_THRIFT_SSL_SINK("iotdb-thrift-ssl-sink",
IoTDBThriftSslConnector.class),
IOTDB_THRIFT_SYNC_SINK("iotdb-thrift-sync-sink",
IoTDBThriftSyncConnector.class),
IOTDB_THRIFT_ASYNC_SINK("iotdb-thrift-async-sink",
IoTDBThriftAsyncConnector.class),
IOTDB_LEGACY_PIPE_SINK("iotdb-legacy-pipe-sink",
IoTDBLegacyPipeConnector.class),
@@ -103,6 +106,7 @@ public enum BuiltinPipePlugin {
// Connectors
DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(),
IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase(),
+ IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName().toUpperCase(),
IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(),
IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(),
@@ -114,5 +118,7 @@ public enum BuiltinPipePlugin {
IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(),
IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(),
IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(),
- WEBSOCKET_SINK.getPipePluginName().toUpperCase())));
+ WEBSOCKET_SINK.getPipePluginName().toUpperCase(),
+ OPC_UA_SINK.getPipePluginName().toUpperCase(),
+ WRITE_BACK_SINK.getPipePluginName().toUpperCase())));
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftSslConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftSslConnector.java
new file mode 100644
index 00000000000..f8416746449
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftSslConnector.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents
the IoTDB SSL connector.
+ * There is a real implementation in the server module but cannot be imported
here. The pipe agent
+ * in the server module will replace this class with the real implementation
when initializing the
+ * IoTDB SSL connector.
+ */
+public class IoTDBThriftSslConnector extends PlaceholderConnector {}