This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new dd7bada581d Pipe: conceal SSL trust store password & improve exception
messages for sink connection establishment (#11806)
dd7bada581d is described below
commit dd7bada581ddd592f0dbd2fe5af1f3246075454c
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Jan 2 14:29:52 2024 +0800
Pipe: conceal SSL trust store password & improve exception messages for
sink connection establishment (#11806)
1. Concealing the `ssl.trust-store-pwd` in parameters.
2. Implementing stricter checks for SSL in the case of async connector.
3. Standardizing exceptions thrown during the construction of
`IoTDBThriftSyncConnectorClient` as `PipeConnectionException`.
4. Standardizing exceptions thrown during the parsing of node URLs as
`PipeParameterNotValidException`.
---
.../api/customizer/parameter/PipeParameters.java | 33 ++++++++-
.../api/exception/PipeConnectionException.java | 2 +
.../exception/PipeParameterNotValidException.java | 3 +
.../protocol/legacy/IoTDBLegacyPipeConnector.java | 25 ++++---
.../thrift/async/IoTDBThriftAsyncConnector.java | 10 ++-
.../thrift/sync/IoTDBThriftSyncClientManager.java | 39 ++++++----
.../builtin/connector/iotdb/IoTDBConnector.java | 82 +++++++++++++++-------
.../connector/iotdb/thrift/IoTDBMetaConnector.java | 34 +++++----
.../commons/pipe/task/meta/PipeStaticMeta.java | 6 +-
9 files changed, 160 insertions(+), 74 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 89ebaa6bcb7..9a12f8e315c 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -29,7 +29,10 @@ import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeCo
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* Used in {@link PipeExtractor#customize(PipeParameters,
PipeExtractorRuntimeConfiguration)} ,
@@ -263,9 +266,17 @@ public class PipeParameters {
return attributes.hashCode();
}
+ /**
+ * When exposing the content of this `PipeParameters` to external sources
(such as `show pipes` or
+ * logging), please use this `toString` method to prevent the leakage of
private information.
+ */
@Override
public String toString() {
- return attributes.toString();
+ return attributes.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Entry::getKey, entry -> ValueHider.hide(entry.getKey(),
entry.getValue())))
+ .toString();
}
private static class KeyReducer {
@@ -293,4 +304,24 @@ public class PipeParameters {
return key;
}
}
+
+ public static class ValueHider {
+ private static final Set<String> KEYS = new HashSet<>();
+
+ private static final String PLACEHOLDER = "******";
+
+ static {
+ KEYS.add("ssl.trust-store-pwd");
+ }
+
+ static String hide(String key, String value) {
+ if (Objects.isNull(key)) {
+ return value;
+ }
+ if (KEYS.contains(KeyReducer.reduce(key))) {
+ return PLACEHOLDER;
+ }
+ return value;
+ }
+ }
}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
index 14cacc73fe9..7ad133a7190 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.pipe.api.exception;
public class PipeConnectionException extends PipeException {
+ public static String CONNECTION_ERROR_FORMATTER = "Connect to receiver %s:%s
error, because: %s";
+
public PipeConnectionException(String message) {
super(message);
}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeParameterNotValidException.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeParameterNotValidException.java
index 9788bb3f4a3..c11513b007f 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeParameterNotValidException.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeParameterNotValidException.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.pipe.api.exception;
public class PipeParameterNotValidException extends PipeException {
+ public static String PARSE_URL_ERROR_FORMATTER =
+ "Error parsing node urls from target servers %s, because %s";
+
public PipeParameterNotValidException(String message) {
super(message);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index f3c6881f5f3..d946aa28bfe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -216,19 +216,18 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
public void handshake() throws Exception {
close();
- client =
- new IoTDBThriftSyncConnectorClient(
- new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS())
-
.setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled())
- .build(),
- ipAddress,
- port,
- useSSL,
- trustStore,
- trustStorePwd);
-
try {
+ client =
+ new IoTDBThriftSyncConnectorClient(
+ new ThriftClientProperty.Builder()
+
.setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS())
+
.setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled())
+ .build(),
+ ipAddress,
+ port,
+ useSSL,
+ trustStore,
+ trustStorePwd);
final TSyncIdentityInfo identityInfo =
new TSyncIdentityInfo(
pipeName, creationTime, syncConnectorVersion,
IoTDBConstant.PATH_ROOT);
@@ -244,7 +243,7 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
} catch (TException e) {
throw new PipeConnectionException(
String.format(
- "Connect to receiver %s:%s error, because: %s", ipAddress, port,
e.getMessage()),
+ PipeConnectionException.CONNECTION_ERROR_FORMATTER, ipAddress,
port, e.getMessage()),
e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index e8460af85bb..a66ce02a630 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -60,6 +60,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LEADER_CACHE_ENABLE_KEY;
public class IoTDBThriftAsyncConnector extends IoTDBConnector {
@@ -94,9 +96,11 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
final PipeParameters parameters = validator.getParameters();
validator.validate(
- useSSL -> !((boolean) useSSL),
- "IoTDBThriftAsyncConnector does not support SSL transmission
currently",
- parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false));
+ args -> !((boolean) args[0] || (boolean) args[1] || (boolean) args[2]),
+ "Only 'iotdb-thrift-ssl-sink' supports SSL transmission currently.",
+ parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false),
+ parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY),
+ parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
index 4425439c07f..c2873838e4a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
@@ -72,8 +72,7 @@ public class IoTDBThriftSyncClientManager extends
IoTDBThriftClientManager imple
}
}
- public void checkClientStatusAndTryReconstructIfNecessary()
- throws IOException, TTransportException {
+ public void checkClientStatusAndTryReconstructIfNecessary() throws
IOException {
// reconstruct all dead clients
for (final Map.Entry<TEndPoint, Pair<IoTDBThriftSyncConnectorClient,
Boolean>> entry :
endPoint2ClientAndStatus.entrySet()) {
@@ -96,7 +95,7 @@ public class IoTDBThriftSyncClientManager extends
IoTDBThriftClientManager imple
"All target servers %s are not available.",
endPoint2ClientAndStatus.keySet()));
}
- private void reconstructClient(TEndPoint endPoint) throws
TTransportException, IOException {
+ private void reconstructClient(TEndPoint endPoint) throws IOException {
final Pair<IoTDBThriftSyncConnectorClient, Boolean> clientAndStatus =
endPoint2ClientAndStatus.get(endPoint);
@@ -112,18 +111,28 @@ public class IoTDBThriftSyncClientManager extends
IoTDBThriftClientManager imple
}
}
- clientAndStatus.setLeft(
- new IoTDBThriftSyncConnectorClient(
- new ThriftClientProperty.Builder()
- .setConnectionTimeoutMs((int)
PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
- .setRpcThriftCompressionEnabled(
- PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
- .build(),
- endPoint.getIp(),
- endPoint.getPort(),
- useSSL,
- trustStorePath,
- trustStorePwd));
+ try {
+ clientAndStatus.setLeft(
+ new IoTDBThriftSyncConnectorClient(
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs((int)
PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+ .setRpcThriftCompressionEnabled(
+ PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+ .build(),
+ endPoint.getIp(),
+ endPoint.getPort(),
+ useSSL,
+ trustStorePath,
+ trustStorePwd));
+ } catch (TTransportException e) {
+ throw new PipeConnectionException(
+ String.format(
+ PipeConnectionException.CONNECTION_ERROR_FORMATTER,
+ endPoint.getIp(),
+ endPoint.getPort(),
+ e.getMessage()),
+ e);
+ }
try {
final TPipeTransferResp resp =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
index 6c4ad1ac634..b88d420c5e2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.pipe.api.PipeConnector;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +34,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE;
@@ -92,37 +94,65 @@ public abstract class IoTDBConnector implements
PipeConnector {
LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}",
isTabletBatchModeEnabled);
}
- protected Set<TEndPoint> parseNodeUrls(PipeParameters parameters) {
+ protected Set<TEndPoint> parseNodeUrls(PipeParameters parameters)
+ throws PipeParameterNotValidException {
final Set<TEndPoint> givenNodeUrls = new HashSet<>(nodeUrls);
- if (parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY)
- && parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY)) {
- givenNodeUrls.add(
- new TEndPoint(
- parameters.getStringByKeys(CONNECTOR_IOTDB_IP_KEY),
- parameters.getIntByKeys(CONNECTOR_IOTDB_PORT_KEY)));
+ try {
+ if (parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY)
+ && parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY)) {
+ givenNodeUrls.add(
+ new TEndPoint(
+ parameters.getStringByKeys(CONNECTOR_IOTDB_IP_KEY),
+ parameters.getIntByKeys(CONNECTOR_IOTDB_PORT_KEY)));
+ }
+
+ if (parameters.hasAttribute(SINK_IOTDB_IP_KEY)
+ && parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) {
+ givenNodeUrls.add(
+ new TEndPoint(
+ parameters.getStringByKeys(SINK_IOTDB_IP_KEY),
+ parameters.getIntByKeys(SINK_IOTDB_PORT_KEY)));
+ }
+
+ if (parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY)) {
+ givenNodeUrls.addAll(
+ NodeUrlUtils.parseTEndPointUrls(
+ Arrays.asList(
+
parameters.getStringByKeys(CONNECTOR_IOTDB_NODE_URLS_KEY).split(","))));
+ }
+
+ if (parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY)) {
+ givenNodeUrls.addAll(
+ NodeUrlUtils.parseTEndPointUrls(
+
Arrays.asList(parameters.getStringByKeys(SINK_IOTDB_NODE_URLS_KEY).split(","))));
+ }
+ } catch (Exception e) {
+ throw new PipeParameterNotValidException(
+ String.format(
+ PipeParameterNotValidException.PARSE_URL_ERROR_FORMATTER,
givenNodeUrls, e));
}
- if (parameters.hasAttribute(SINK_IOTDB_IP_KEY)
- && parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) {
- givenNodeUrls.add(
- new TEndPoint(
- parameters.getStringByKeys(SINK_IOTDB_IP_KEY),
- parameters.getIntByKeys(SINK_IOTDB_PORT_KEY)));
- }
-
- if (parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY)) {
- givenNodeUrls.addAll(
- NodeUrlUtils.parseTEndPointUrls(
-
Arrays.asList(parameters.getStringByKeys(CONNECTOR_IOTDB_NODE_URLS_KEY).split(","))));
- }
+ checkNodeUrls(givenNodeUrls);
+ return givenNodeUrls;
+ }
- if (parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY)) {
- givenNodeUrls.addAll(
- NodeUrlUtils.parseTEndPointUrls(
-
Arrays.asList(parameters.getStringByKeys(SINK_IOTDB_NODE_URLS_KEY).split(","))));
+ private void checkNodeUrls(Set<TEndPoint> nodeUrls) throws
PipeParameterNotValidException {
+ for (TEndPoint nodeUrl : nodeUrls) {
+ if (Objects.isNull(nodeUrl.ip) || nodeUrl.ip.isEmpty()) {
+ throw new PipeParameterNotValidException(
+ String.format(
+ PipeParameterNotValidException.PARSE_URL_ERROR_FORMATTER,
+ nodeUrls,
+ "ip cannot be empty"));
+ }
+ if (nodeUrl.port == 0) {
+ throw new PipeParameterNotValidException(
+ String.format(
+ PipeParameterNotValidException.PARSE_URL_ERROR_FORMATTER,
+ nodeUrls,
+ "port cannot be empty"));
+ }
}
-
- return givenNodeUrls;
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/thrift/IoTDBMetaConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/thrift/IoTDBMetaConnector.java
index bc244a9aa63..7563fb19f68 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/thrift/IoTDBMetaConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/thrift/IoTDBMetaConnector.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -168,19 +169,26 @@ public abstract class IoTDBMetaConnector extends
IoTDBConnector {
}
}
- clients.set(
- i,
- new IoTDBThriftSyncConnectorClient(
- new ThriftClientProperty.Builder()
- .setConnectionTimeoutMs((int)
PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
- .setRpcThriftCompressionEnabled(
- PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
- .build(),
- ip,
- port,
- false,
- null,
- null));
+ try {
+ clients.set(
+ i,
+ new IoTDBThriftSyncConnectorClient(
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs((int)
PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+ .setRpcThriftCompressionEnabled(
+
PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+ .build(),
+ ip,
+ port,
+ false,
+ null,
+ null));
+ } catch (TTransportException e) {
+ throw new PipeConnectionException(
+ String.format(
+ PipeConnectionException.CONNECTION_ERROR_FORMATTER, ip, port,
e.getMessage()),
+ e);
+ }
// TODO: validate client connectivity here, just like in ThriftSync.
isClientAlive.set(i, true);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index 9be4ce369d1..1f393bd9b3f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -221,11 +221,11 @@ public class PipeStaticMeta {
+ "', creationTime="
+ creationTime
+ ", extractorParameters="
- + extractorParameters.getAttribute()
+ + extractorParameters
+ ", processorParameters="
- + processorParameters.getAttribute()
+ + processorParameters
+ ", connectorParameters="
- + connectorParameters.getAttribute()
+ + connectorParameters
+ "}";
}
}