This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch validate-user-password-receiver
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b1bce2bd28555e67b5ddae23593bf78271f32de0
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Oct 28 22:32:16 2024 +0800

    Pipe: pass username & password to receiver
---
 .../org/apache/iotdb/tool/tsfile/ImportTsFile.java |  2 ++
 .../iotdb/tool/tsfile/ImportTsFileRemotely.java    | 14 +++++++++++
 .../client/IoTDBConfigNodeSyncClientManager.java   |  4 ++++
 .../protocol/IoTDBConfigRegionAirGapConnector.java |  2 ++
 .../protocol/IoTDBConfigRegionConnector.java       |  4 ++++
 .../client/IoTDBDataNodeAsyncClientManager.java    | 28 +++++++++++++++-------
 .../client/IoTDBDataNodeSyncClientManager.java     |  4 ++++
 .../airgap/IoTDBDataNodeAirGapConnector.java       |  2 ++
 .../async/IoTDBDataRegionAsyncConnector.java       |  2 ++
 .../thrift/sync/IoTDBDataNodeSyncConnector.java    |  2 ++
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  1 +
 .../config/constant/PipeConnectorConstant.java     |  6 +++--
 .../pipe/connector/client/IoTDBClientManager.java  | 18 +++++++++++++-
 .../connector/client/IoTDBSyncClientManager.java   | 19 ++++++++-------
 .../common/PipeTransferHandshakeConstant.java      |  2 ++
 .../pipe/connector/protocol/IoTDBConnector.java    | 18 ++++++++++++++
 .../connector/protocol/IoTDBSslSyncConnector.java  |  4 ++++
 .../commons/pipe/receiver/IoTDBFileReceiver.java   | 16 +++++++++++++
 18 files changed, 128 insertions(+), 20 deletions(-)

diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
index 497900a0371..102e9ba5f0f 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
@@ -332,6 +332,8 @@ public class ImportTsFile extends AbstractTsFileTool {
     // ImportTsFileRemotely
     ImportTsFileRemotely.setHost(host);
     ImportTsFileRemotely.setPort(port);
+    ImportTsFileRemotely.setUsername(username);
+    ImportTsFileRemotely.setPassword(password);
 
     // ImportTsFileBase
     ImportTsFileBase.setSuccessAndFailDirAndOperation(
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
index 8f3c22f9e0b..115ebb730ee 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
+import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -72,6 +73,9 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
   private static String host;
   private static String port;
 
+  private static String username = SessionConfig.DEFAULT_USER;
+  private static String password = SessionConfig.DEFAULT_PASSWORD;
+
   public ImportTsFileRemotely() {
     initClient();
     sendHandshake();
@@ -186,6 +190,8 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
         Boolean.toString(true));
     
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
LOAD_STRATEGY);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
     return params;
   }
 
@@ -335,4 +341,12 @@ public class ImportTsFileRemotely extends ImportTsFileBase 
{
   public static void setPort(final String port) {
     ImportTsFileRemotely.port = port;
   }
+
+  public static void setUsername(final String username) {
+    ImportTsFileRemotely.username = username;
+  }
+
+  public static void setPassword(final String password) {
+    ImportTsFileRemotely.password = password;
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index b2967e3bdf8..00a06c926c8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -35,6 +35,8 @@ public class IoTDBConfigNodeSyncClientManager extends 
IoTDBSyncClientManager {
 
   public IoTDBConfigNodeSyncClientManager(
       List<TEndPoint> endPoints,
+      String username,
+      String password,
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
@@ -43,6 +45,8 @@ public class IoTDBConfigNodeSyncClientManager extends 
IoTDBSyncClientManager {
       String loadTsFileStrategy) {
     super(
         endPoints,
+        username,
+        password,
         useSSL,
         trustStorePath,
         trustStorePwd,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index e615616e2ba..7dd90f18dee 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -73,6 +73,8 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
         Boolean.toString(shouldReceiverConvertOnTypeMismatch));
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
 
     return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 307151df706..809daf044c0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -59,6 +59,8 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
   @Override
   protected IoTDBSyncClientManager constructClient(
       final List<TEndPoint> nodeUrls,
+      final String username,
+      final String password,
       final boolean useSSL,
       final String trustStorePath,
       final String trustStorePwd,
@@ -68,6 +70,8 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
       final String loadTsFileStrategy) {
     return new IoTDBConfigNodeSyncClientManager(
         nodeUrls,
+        username,
+        password,
         useSSL,
         Objects.nonNull(trustStorePath) ? 
ConfigNodeConfig.addHomeDir(trustStorePath) : null,
         trustStorePwd,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 549923882d5..1bbf5273187 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -39,6 +39,7 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Base64;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -72,22 +73,32 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
   private final LoadBalancer loadBalancer;
 
-  private final boolean shouldReceiverConvertOnTypeMismatch;
-
-  private final String loadTsFileStrategy;
-
   public IoTDBDataNodeAsyncClientManager(
       List<TEndPoint> endPoints,
+      /* The following parameters are used locally. */
       boolean useLeaderCache,
       String loadBalanceStrategy,
+      /* The following parameters are used to handshake with the receiver. */
+      String username,
+      String password,
       boolean shouldReceiverConvertOnTypeMismatch,
       String loadTsFileStrategy) {
-    super(endPoints, useLeaderCache);
+    super(
+        endPoints,
+        username,
+        password,
+        shouldReceiverConvertOnTypeMismatch,
+        loadTsFileStrategy,
+        useLeaderCache);
 
     endPointSet = new HashSet<>(endPoints);
 
     receiverAttributes =
-        String.format("%s-%s", shouldReceiverConvertOnTypeMismatch, 
loadTsFileStrategy);
+        String.format(
+            "%s-%s-%s",
+            Base64.getEncoder().encodeToString((username + ":" + 
password).getBytes()),
+            shouldReceiverConvertOnTypeMismatch,
+            loadTsFileStrategy);
     synchronized (IoTDBDataNodeAsyncClientManager.class) {
       if 
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
 {
         ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
@@ -118,9 +129,6 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
             loadBalanceStrategy);
         loadBalancer = new RoundRobinLoadBalancer();
     }
-
-    this.shouldReceiverConvertOnTypeMismatch = 
shouldReceiverConvertOnTypeMismatch;
-    this.loadTsFileStrategy = loadTsFileStrategy;
   }
 
   public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
@@ -234,6 +242,8 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
           Boolean.toString(shouldReceiverConvertOnTypeMismatch));
       params.put(
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
+      params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, 
username);
+      params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, 
password);
 
       
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
       
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
 callback);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index 5e4e0fbfcb8..ae3f07b068c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -44,6 +44,8 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
 
   public IoTDBDataNodeSyncClientManager(
       final List<TEndPoint> endPoints,
+      final String username,
+      final String password,
       final boolean useSSL,
       final String trustStorePath,
       final String trustStorePwd,
@@ -53,6 +55,8 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
       final String loadTsFileStrategy) {
     super(
         endPoints,
+        username,
+        password,
         useSSL,
         trustStorePath,
         trustStorePwd,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 33be8e002f4..788244f738c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -105,6 +105,8 @@ public abstract class IoTDBDataNodeAirGapConnector extends 
IoTDBAirGapConnector
         Boolean.toString(shouldReceiverConvertOnTypeMismatch));
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
 
     return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 5aa4324a2f3..f127b7d1734 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -125,6 +125,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
                 Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, 
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
                 CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE),
             loadBalanceStrategy,
+            username,
+            password,
             shouldReceiverConvertOnTypeMismatch,
             loadTsFileStrategy);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index e6f1ac97957..6c67b25a08b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -93,6 +93,8 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
     clientManager =
         new IoTDBDataNodeSyncClientManager(
             nodeUrls,
+            username,
+            password,
             useSSL,
             Objects.nonNull(trustStorePath) ? 
IoTDBConfig.addDataHomeDir(trustStorePath) : null,
             trustStorePwd,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 56789b69a99..e8ffce63f56 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -623,6 +623,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   }
 
   private static TSStatus executeStatement(final Statement statement) {
+    // user name & password
     return Coordinator.getInstance()
         .executeForTreeModel(
             new PipeEnrichedStatement(statement),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 77eccfb4069..89b5ddfada1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.config.constant;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.isession.SessionConfig;
 
 import com.github.luben.zstd.Zstd;
 
@@ -75,11 +76,12 @@ public class PipeConnectorConstant {
 
   public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
   public static final String SINK_IOTDB_USER_KEY = "sink.user";
-  public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root";
+  public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = 
SessionConfig.DEFAULT_USER;
 
   public static final String CONNECTOR_IOTDB_PASSWORD_KEY = 
"connector.password";
   public static final String SINK_IOTDB_PASSWORD_KEY = "sink.password";
-  public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root";
+  public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE =
+      SessionConfig.DEFAULT_PASSWORD;
 
   public static final String 
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY =
       "connector.exception.data.convert-on-type-mismatch";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index 73e0543fe67..ed3334b2459 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -37,6 +37,12 @@ public abstract class IoTDBClientManager {
   protected final List<TEndPoint> endPointList;
   protected long currentClientIndex = 0;
 
+  protected final String username;
+  protected final String password;
+
+  protected final boolean shouldReceiverConvertOnTypeMismatch;
+  protected final String loadTsFileStrategy;
+
   protected final boolean useLeaderCache;
 
   // This flag indicates whether the receiver supports mods transferring if
@@ -48,8 +54,18 @@ public abstract class IoTDBClientManager {
   protected static final AtomicInteger CONNECTION_TIMEOUT_MS =
       new 
AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
 
-  protected IoTDBClientManager(List<TEndPoint> endPointList, boolean 
useLeaderCache) {
+  protected IoTDBClientManager(
+      List<TEndPoint> endPointList,
+      String username,
+      String password,
+      boolean shouldReceiverConvertOnTypeMismatch,
+      final String loadTsFileStrategy,
+      boolean useLeaderCache) {
     this.endPointList = endPointList;
+    this.username = username;
+    this.password = password;
+    this.shouldReceiverConvertOnTypeMismatch = 
shouldReceiverConvertOnTypeMismatch;
+    this.loadTsFileStrategy = loadTsFileStrategy;
     this.useLeaderCache = useLeaderCache;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 81b5194621c..022e4fc7b53 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -60,12 +60,10 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
 
   private final LoadBalancer loadBalancer;
 
-  private final boolean shouldReceiverConvertOnTypeMismatch;
-
-  private final String loadTsFileStrategy;
-
   protected IoTDBSyncClientManager(
       List<TEndPoint> endPoints,
+      String username,
+      String password,
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
@@ -73,7 +71,13 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       String loadBalanceStrategy,
       boolean shouldReceiverConvertOnTypeMismatch,
       String loadTsFileStrategy) {
-    super(endPoints, useLeaderCache);
+    super(
+        endPoints,
+        username,
+        password,
+        shouldReceiverConvertOnTypeMismatch,
+        loadTsFileStrategy,
+        useLeaderCache);
 
     this.useSSL = useSSL;
     this.trustStorePath = trustStorePath;
@@ -99,9 +103,6 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
             loadBalanceStrategy);
         loadBalancer = new RoundRobinLoadBalancer();
     }
-
-    this.shouldReceiverConvertOnTypeMismatch = 
shouldReceiverConvertOnTypeMismatch;
-    this.loadTsFileStrategy = loadTsFileStrategy;
   }
 
   public void checkClientStatusAndTryReconstructIfNecessary() {
@@ -183,6 +184,8 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
           Boolean.toString(shouldReceiverConvertOnTypeMismatch));
       params.put(
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
+      params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, 
username);
+      params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, 
password);
 
       // Try to handshake by PipeTransferHandshakeV2Req.
       TPipeTransferResp resp = 
client.pipeTransfer(buildHandshakeV2Req(params));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
index ec8818072d2..46bd38ba45a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
@@ -25,6 +25,8 @@ public class PipeTransferHandshakeConstant {
   public static final String HANDSHAKE_KEY_CLUSTER_ID = "clusterID";
   public static final String HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH = 
"convertOnTypeMismatch";
   public static final String HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY = 
"loadTsFileStrategy";
+  public static final String HANDSHAKE_KEY_USERNAME = "username";
+  public static final String HANDSHAKE_KEY_PASSWORD = "password";
 
   private PipeTransferHandshakeConstant() {
     // Utility class
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index db58cec22ca..f76139b36e0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -78,8 +78,12 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_HOST_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET;
@@ -102,7 +106,9 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_HOST_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
@@ -118,6 +124,9 @@ public abstract class IoTDBConnector implements 
PipeConnector {
 
   protected final List<TEndPoint> nodeUrls = new ArrayList<>();
 
+  protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
+  protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+
   protected String loadBalanceStrategy;
 
   protected String loadTsFileStrategy;
@@ -175,6 +184,15 @@ public abstract class IoTDBConnector implements 
PipeConnector {
             Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
             CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE));
 
+    username =
+        parameters.getStringOrDefault(
+            Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
+            CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
+    password =
+        parameters.getStringOrDefault(
+            Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, 
SINK_IOTDB_PASSWORD_KEY),
+            CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+
     loadBalanceStrategy =
         parameters
             .getStringOrDefault(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index 6a4dd9e9067..ffdc9f55b18 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -118,6 +118,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
     clientManager =
         constructClient(
             nodeUrls,
+            username,
+            password,
             useSSL,
             trustStorePath,
             trustStorePwd,
@@ -129,6 +131,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
 
   protected abstract IoTDBSyncClientManager constructClient(
       final List<TEndPoint> nodeUrls,
+      final String username,
+      final String password,
       final boolean useSSL,
       final String trustStorePath,
       final String trustStorePwd,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 4b37cf9aabd..59bcc06c45c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -53,6 +53,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
 
 /**
  * {@link IoTDBFileReceiver} is the parent class of receiver on both 
configNode and DataNode,
@@ -67,6 +69,9 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   private static final AtomicLong RECEIVER_ID_GENERATOR = new AtomicLong(0);
   protected final AtomicLong receiverId = new AtomicLong(0);
 
+  protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
+  protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+
   private File writingFile;
   private RandomAccessFile writingFileWriter;
 
@@ -242,6 +247,17 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
               loadTsFileStrategyString));
     }
 
+    final String usernameString =
+        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME);
+    if (usernameString != null) {
+      username = usernameString;
+    }
+    final String passwordString =
+        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD);
+    if (passwordString != null) {
+      password = passwordString;
+    }
+
     // Handle the handshake request as a v1 request.
     // Here we construct a fake "dataNode" request to valid from v1 validation 
logic, though
     // it may not require the actual type of the v1 request.

Reply via email to