This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new dd7bada581d Pipe: conceal SSL trust store password & improve exception 
messages for sink connection establishment (#11806)
dd7bada581d is described below

commit dd7bada581ddd592f0dbd2fe5af1f3246075454c
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Jan 2 14:29:52 2024 +0800

    Pipe: conceal SSL trust store password & improve exception messages for 
sink connection establishment (#11806)
    
    1. Concealing the `ssl.trust-store-pwd` in parameters.
    2. Implementing stricter checks for SSL in the case of async connector.
    3. Standardizing exceptions thrown during the construction of 
`IoTDBThriftSyncConnectorClient` as `PipeConnectionException`.
    4. Standardizing exceptions thrown during the parsing of node URLs as 
`PipeParameterNotValidException`.
---
 .../api/customizer/parameter/PipeParameters.java   | 33 ++++++++-
 .../api/exception/PipeConnectionException.java     |  2 +
 .../exception/PipeParameterNotValidException.java  |  3 +
 .../protocol/legacy/IoTDBLegacyPipeConnector.java  | 25 ++++---
 .../thrift/async/IoTDBThriftAsyncConnector.java    | 10 ++-
 .../thrift/sync/IoTDBThriftSyncClientManager.java  | 39 ++++++----
 .../builtin/connector/iotdb/IoTDBConnector.java    | 82 +++++++++++++++-------
 .../connector/iotdb/thrift/IoTDBMetaConnector.java | 34 +++++----
 .../commons/pipe/task/meta/PipeStaticMeta.java     |  6 +-
 9 files changed, 160 insertions(+), 74 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 89ebaa6bcb7..9a12f8e315c 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -29,7 +29,10 @@ import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeCo
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Used in {@link PipeExtractor#customize(PipeParameters, 
PipeExtractorRuntimeConfiguration)} ,
@@ -263,9 +266,17 @@ public class PipeParameters {
     return attributes.hashCode();
   }
 
+  /**
+   * When exposing the content of this `PipeParameters` to external sources 
(such as `show pipes` or
+   * logging), please use this `toString` method to prevent the leakage of 
private information.
+   */
   @Override
   public String toString() {
-    return attributes.toString();
+    return attributes.entrySet().stream()
+        .collect(
+            Collectors.toMap(
+                Entry::getKey, entry -> ValueHider.hide(entry.getKey(), 
entry.getValue())))
+        .toString();
   }
 
   private static class KeyReducer {
@@ -293,4 +304,24 @@ public class PipeParameters {
       return key;
     }
   }
+
+  public static class ValueHider {
+    private static final Set<String> KEYS = new HashSet<>();
+
+    private static final String PLACEHOLDER = "******";
+
+    static {
+      KEYS.add("ssl.trust-store-pwd");
+    }
+
+    static String hide(String key, String value) {
+      if (Objects.isNull(key)) {
+        return value;
+      }
+      if (KEYS.contains(KeyReducer.reduce(key))) {
+        return PLACEHOLDER;
+      }
+      return value;
+    }
+  }
 }
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
index 14cacc73fe9..7ad133a7190 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.pipe.api.exception;
 
 public class PipeConnectionException extends PipeException {
 
+  public static String CONNECTION_ERROR_FORMATTER = "Connect to receiver %s:%s 
error, because: %s";
+
   public PipeConnectionException(String message) {
     super(message);
   }
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeParameterNotValidException.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeParameterNotValidException.java
index 9788bb3f4a3..c11513b007f 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeParameterNotValidException.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeParameterNotValidException.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.pipe.api.exception;
 
 public class PipeParameterNotValidException extends PipeException {
 
+  public static String PARSE_URL_ERROR_FORMATTER =
+      "Error parsing node urls from target servers %s, because %s";
+
   public PipeParameterNotValidException(String message) {
     super(message);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index f3c6881f5f3..d946aa28bfe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -216,19 +216,18 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
   public void handshake() throws Exception {
     close();
 
-    client =
-        new IoTDBThriftSyncConnectorClient(
-            new ThriftClientProperty.Builder()
-                
.setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS())
-                
.setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled())
-                .build(),
-            ipAddress,
-            port,
-            useSSL,
-            trustStore,
-            trustStorePwd);
-
     try {
+      client =
+          new IoTDBThriftSyncConnectorClient(
+              new ThriftClientProperty.Builder()
+                  
.setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS())
+                  
.setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled())
+                  .build(),
+              ipAddress,
+              port,
+              useSSL,
+              trustStore,
+              trustStorePwd);
       final TSyncIdentityInfo identityInfo =
           new TSyncIdentityInfo(
               pipeName, creationTime, syncConnectorVersion, 
IoTDBConstant.PATH_ROOT);
@@ -244,7 +243,7 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
     } catch (TException e) {
       throw new PipeConnectionException(
           String.format(
-              "Connect to receiver %s:%s error, because: %s", ipAddress, port, 
e.getMessage()),
+              PipeConnectionException.CONNECTION_ERROR_FORMATTER, ipAddress, 
port, e.getMessage()),
           e);
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index e8460af85bb..a66ce02a630 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -60,6 +60,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LEADER_CACHE_ENABLE_KEY;
 
 public class IoTDBThriftAsyncConnector extends IoTDBConnector {
@@ -94,9 +96,11 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
 
     final PipeParameters parameters = validator.getParameters();
     validator.validate(
-        useSSL -> !((boolean) useSSL),
-        "IoTDBThriftAsyncConnector does not support SSL transmission 
currently",
-        parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false));
+        args -> !((boolean) args[0] || (boolean) args[1] || (boolean) args[2]),
+        "Only 'iotdb-thrift-ssl-sink' supports SSL transmission currently.",
+        parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false),
+        parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY),
+        parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY));
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
index 4425439c07f..c2873838e4a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
@@ -72,8 +72,7 @@ public class IoTDBThriftSyncClientManager extends 
IoTDBThriftClientManager imple
     }
   }
 
-  public void checkClientStatusAndTryReconstructIfNecessary()
-      throws IOException, TTransportException {
+  public void checkClientStatusAndTryReconstructIfNecessary() throws 
IOException {
     // reconstruct all dead clients
     for (final Map.Entry<TEndPoint, Pair<IoTDBThriftSyncConnectorClient, 
Boolean>> entry :
         endPoint2ClientAndStatus.entrySet()) {
@@ -96,7 +95,7 @@ public class IoTDBThriftSyncClientManager extends 
IoTDBThriftClientManager imple
             "All target servers %s are not available.", 
endPoint2ClientAndStatus.keySet()));
   }
 
-  private void reconstructClient(TEndPoint endPoint) throws 
TTransportException, IOException {
+  private void reconstructClient(TEndPoint endPoint) throws IOException {
     final Pair<IoTDBThriftSyncConnectorClient, Boolean> clientAndStatus =
         endPoint2ClientAndStatus.get(endPoint);
 
@@ -112,18 +111,28 @@ public class IoTDBThriftSyncClientManager extends 
IoTDBThriftClientManager imple
       }
     }
 
-    clientAndStatus.setLeft(
-        new IoTDBThriftSyncConnectorClient(
-            new ThriftClientProperty.Builder()
-                .setConnectionTimeoutMs((int) 
PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
-                .setRpcThriftCompressionEnabled(
-                    PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
-                .build(),
-            endPoint.getIp(),
-            endPoint.getPort(),
-            useSSL,
-            trustStorePath,
-            trustStorePwd));
+    try {
+      clientAndStatus.setLeft(
+          new IoTDBThriftSyncConnectorClient(
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs((int) 
PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+                  .setRpcThriftCompressionEnabled(
+                      PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+                  .build(),
+              endPoint.getIp(),
+              endPoint.getPort(),
+              useSSL,
+              trustStorePath,
+              trustStorePwd));
+    } catch (TTransportException e) {
+      throw new PipeConnectionException(
+          String.format(
+              PipeConnectionException.CONNECTION_ERROR_FORMATTER,
+              endPoint.getIp(),
+              endPoint.getPort(),
+              e.getMessage()),
+          e);
+    }
 
     try {
       final TPipeTransferResp resp =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
index 6c4ad1ac634..b88d420c5e2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.pipe.api.PipeConnector;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE;
@@ -92,37 +94,65 @@ public abstract class IoTDBConnector implements 
PipeConnector {
     LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", 
isTabletBatchModeEnabled);
   }
 
-  protected Set<TEndPoint> parseNodeUrls(PipeParameters parameters) {
+  protected Set<TEndPoint> parseNodeUrls(PipeParameters parameters)
+      throws PipeParameterNotValidException {
     final Set<TEndPoint> givenNodeUrls = new HashSet<>(nodeUrls);
 
-    if (parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY)
-        && parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY)) {
-      givenNodeUrls.add(
-          new TEndPoint(
-              parameters.getStringByKeys(CONNECTOR_IOTDB_IP_KEY),
-              parameters.getIntByKeys(CONNECTOR_IOTDB_PORT_KEY)));
+    try {
+      if (parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY)
+          && parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY)) {
+        givenNodeUrls.add(
+            new TEndPoint(
+                parameters.getStringByKeys(CONNECTOR_IOTDB_IP_KEY),
+                parameters.getIntByKeys(CONNECTOR_IOTDB_PORT_KEY)));
+      }
+
+      if (parameters.hasAttribute(SINK_IOTDB_IP_KEY)
+          && parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) {
+        givenNodeUrls.add(
+            new TEndPoint(
+                parameters.getStringByKeys(SINK_IOTDB_IP_KEY),
+                parameters.getIntByKeys(SINK_IOTDB_PORT_KEY)));
+      }
+
+      if (parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY)) {
+        givenNodeUrls.addAll(
+            NodeUrlUtils.parseTEndPointUrls(
+                Arrays.asList(
+                    
parameters.getStringByKeys(CONNECTOR_IOTDB_NODE_URLS_KEY).split(","))));
+      }
+
+      if (parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY)) {
+        givenNodeUrls.addAll(
+            NodeUrlUtils.parseTEndPointUrls(
+                
Arrays.asList(parameters.getStringByKeys(SINK_IOTDB_NODE_URLS_KEY).split(","))));
+      }
+    } catch (Exception e) {
+      throw new PipeParameterNotValidException(
+          String.format(
+              PipeParameterNotValidException.PARSE_URL_ERROR_FORMATTER, 
givenNodeUrls, e));
     }
 
-    if (parameters.hasAttribute(SINK_IOTDB_IP_KEY)
-        && parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) {
-      givenNodeUrls.add(
-          new TEndPoint(
-              parameters.getStringByKeys(SINK_IOTDB_IP_KEY),
-              parameters.getIntByKeys(SINK_IOTDB_PORT_KEY)));
-    }
-
-    if (parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY)) {
-      givenNodeUrls.addAll(
-          NodeUrlUtils.parseTEndPointUrls(
-              
Arrays.asList(parameters.getStringByKeys(CONNECTOR_IOTDB_NODE_URLS_KEY).split(","))));
-    }
+    checkNodeUrls(givenNodeUrls);
+    return givenNodeUrls;
+  }
 
-    if (parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY)) {
-      givenNodeUrls.addAll(
-          NodeUrlUtils.parseTEndPointUrls(
-              
Arrays.asList(parameters.getStringByKeys(SINK_IOTDB_NODE_URLS_KEY).split(","))));
+  private void checkNodeUrls(Set<TEndPoint> nodeUrls) throws 
PipeParameterNotValidException {
+    for (TEndPoint nodeUrl : nodeUrls) {
+      if (Objects.isNull(nodeUrl.ip) || nodeUrl.ip.isEmpty()) {
+        throw new PipeParameterNotValidException(
+            String.format(
+                PipeParameterNotValidException.PARSE_URL_ERROR_FORMATTER,
+                nodeUrls,
+                "ip cannot be empty"));
+      }
+      if (nodeUrl.port == 0) {
+        throw new PipeParameterNotValidException(
+            String.format(
+                PipeParameterNotValidException.PARSE_URL_ERROR_FORMATTER,
+                nodeUrls,
+                "port cannot be empty"));
+      }
     }
-
-    return givenNodeUrls;
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/thrift/IoTDBMetaConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/thrift/IoTDBMetaConnector.java
index bc244a9aa63..7563fb19f68 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/thrift/IoTDBMetaConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/thrift/IoTDBMetaConnector.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 
+import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -168,19 +169,26 @@ public abstract class IoTDBMetaConnector extends 
IoTDBConnector {
         }
       }
 
-      clients.set(
-          i,
-          new IoTDBThriftSyncConnectorClient(
-              new ThriftClientProperty.Builder()
-                  .setConnectionTimeoutMs((int) 
PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
-                  .setRpcThriftCompressionEnabled(
-                      PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
-                  .build(),
-              ip,
-              port,
-              false,
-              null,
-              null));
+      try {
+        clients.set(
+            i,
+            new IoTDBThriftSyncConnectorClient(
+                new ThriftClientProperty.Builder()
+                    .setConnectionTimeoutMs((int) 
PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+                    .setRpcThriftCompressionEnabled(
+                        
PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+                    .build(),
+                ip,
+                port,
+                false,
+                null,
+                null));
+      } catch (TTransportException e) {
+        throw new PipeConnectionException(
+            String.format(
+                PipeConnectionException.CONNECTION_ERROR_FORMATTER, ip, port, 
e.getMessage()),
+            e);
+      }
 
       // TODO: validate client connectivity here, just like in ThriftSync.
       isClientAlive.set(i, true);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index 9be4ce369d1..1f393bd9b3f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -221,11 +221,11 @@ public class PipeStaticMeta {
         + "', creationTime="
         + creationTime
         + ", extractorParameters="
-        + extractorParameters.getAttribute()
+        + extractorParameters
         + ", processorParameters="
-        + processorParameters.getAttribute()
+        + processorParameters
         + ", connectorParameters="
-        + connectorParameters.getAttribute()
+        + connectorParameters
         + "}";
   }
 }

Reply via email to