This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new fd0940ab670 Pipe: Support specifing receiver's username and password 
when syncing data between clusters (#13933)
fd0940ab670 is described below

commit fd0940ab67044dbd3de71237ce003cd29a90d4c9
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Oct 29 03:20:56 2024 +0800

    Pipe: Support specifing receiver's username and password when syncing data 
between clusters (#13933)
---
 .../pipe/it/manual/IoTDBPipePermissionIT.java      | 164 +++++++++++++++++++++
 .../api/customizer/parameter/PipeParameters.java   |   1 +
 .../org/apache/iotdb/tool/tsfile/ImportTsFile.java |   2 +
 .../iotdb/tool/tsfile/ImportTsFileRemotely.java    |  14 ++
 .../client/IoTDBConfigNodeSyncClientManager.java   |   4 +
 .../protocol/IoTDBConfigRegionAirGapConnector.java |   2 +
 .../protocol/IoTDBConfigRegionConnector.java       |   4 +
 .../client/IoTDBDataNodeAsyncClientManager.java    |  28 ++--
 .../client/IoTDBDataNodeSyncClientManager.java     |   4 +
 .../airgap/IoTDBDataNodeAirGapConnector.java       |   2 +
 .../protocol/legacy/IoTDBLegacyPipeConnector.java  |   8 +-
 .../connector/protocol/opcua/OpcUaConnector.java   |   8 +-
 .../async/IoTDBDataRegionAsyncConnector.java       |   2 +
 .../thrift/sync/IoTDBDataNodeSyncConnector.java    |   4 +
 .../protocol/airgap/IoTDBAirGapReceiver.java       |   8 +
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  50 ++++++-
 .../config/constant/PipeConnectorConstant.java     |   2 +
 .../pipe/connector/client/IoTDBClientManager.java  |  18 ++-
 .../connector/client/IoTDBSyncClientManager.java   |  19 ++-
 .../common/PipeTransferHandshakeConstant.java      |   2 +
 .../pipe/connector/protocol/IoTDBConnector.java    |  24 +++
 .../connector/protocol/IoTDBSslSyncConnector.java  |   4 +
 .../commons/pipe/receiver/IoTDBFileReceiver.java   |  16 ++
 23 files changed, 363 insertions(+), 27 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
new file mode 100644
index 00000000000..a7e935a7548
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.manual;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2ManualCreateSchema.class})
+public class IoTDBPipePermissionIT extends AbstractPipeDualManualIT {
+  @Override
+  @Before
+  public void setUp() {
+    MultiEnvFactory.createEnv(2);
+    senderEnv = MultiEnvFactory.getEnv(0);
+    receiverEnv = MultiEnvFactory.getEnv(1);
+
+    // TODO: delete ratis configurations
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(false)
+        .setDefaultSchemaRegionGroupNumPerDatabase(1)
+        .setTimestampPrecision("ms")
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(false)
+        .setTimestampPrecision("ms")
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+        .setSchemaReplicationFactor(3)
+        .setDataReplicationFactor(2);
+
+    // 10 min, assert that the operations will not time out
+    senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
+    receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
+
+    senderEnv.initClusterEnvironment();
+    receiverEnv.initClusterEnvironment(3, 3);
+  }
+
+  @Test
+  public void testWithSyncConnector() throws Exception {
+    testWithConnector("iotdb-thrift-sync-connector");
+  }
+
+  @Test
+  public void testWithAsyncConnector() throws Exception {
+    testWithConnector("iotdb-thrift-async-connector");
+  }
+
+  private void testWithConnector(final String connector) throws Exception {
+    if (!TestUtils.tryExecuteNonQueriesWithRetry(
+        receiverEnv,
+        Arrays.asList(
+            "create user `thulab` 'passwd'",
+            "create role `admin`",
+            "grant role `admin` to `thulab`",
+            "grant WRITE, READ, MANAGE_DATABASE on root.** to role `admin`"))) 
{
+      return;
+    }
+
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "create timeseries root.ln.wf02.wt01.temperature with 
datatype=INT64,encoding=PLAIN",
+              "create timeseries root.ln.wf02.wt01.status with 
datatype=BOOLEAN,encoding=PLAIN",
+              "insert into root.ln.wf02.wt01(time, temperature, status) values 
(1800000000000, 23, true)"))) {
+        fail();
+        return;
+      }
+
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor.inclusion", "all");
+
+      connectorAttributes.put("connector", connector);
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      connectorAttributes.put("connector.username", "thulab");
+      connectorAttributes.put("connector.password", "passwd");
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("testPipe", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
+
+      final Set<String> expectedResSet = new HashSet<>();
+      expectedResSet.add(
+          
"root.ln.wf02.wt01.temperature,null,root.ln,INT64,PLAIN,LZ4,null,null,null,null,BASE,");
+      expectedResSet.add(
+          
"root.ln.wf02.wt01.status,null,root.ln,BOOLEAN,PLAIN,LZ4,null,null,null,null,BASE,");
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "show timeseries",
+          
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+          expectedResSet);
+      expectedResSet.clear();
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select * from root.**",
+          "Time,root.ln.wf02.wt01.temperature,root.ln.wf02.wt01.status,",
+          Collections.singleton("1800000000000,23,true,"));
+    }
+  }
+}
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 4d0f75790dc..3dcb2d19b0a 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -383,6 +383,7 @@ public class PipeParameters {
 
     static {
       KEYS.add("ssl.trust-store-pwd");
+      KEYS.add("password");
     }
 
     static String hide(final String key, final String value) {
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
index 497900a0371..102e9ba5f0f 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
@@ -332,6 +332,8 @@ public class ImportTsFile extends AbstractTsFileTool {
     // ImportTsFileRemotely
     ImportTsFileRemotely.setHost(host);
     ImportTsFileRemotely.setPort(port);
+    ImportTsFileRemotely.setUsername(username);
+    ImportTsFileRemotely.setPassword(password);
 
     // ImportTsFileBase
     ImportTsFileBase.setSuccessAndFailDirAndOperation(
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
index 8f3c22f9e0b..115ebb730ee 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
+import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -72,6 +73,9 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
   private static String host;
   private static String port;
 
+  private static String username = SessionConfig.DEFAULT_USER;
+  private static String password = SessionConfig.DEFAULT_PASSWORD;
+
   public ImportTsFileRemotely() {
     initClient();
     sendHandshake();
@@ -186,6 +190,8 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
         Boolean.toString(true));
     
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
LOAD_STRATEGY);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
     return params;
   }
 
@@ -335,4 +341,12 @@ public class ImportTsFileRemotely extends ImportTsFileBase 
{
   public static void setPort(final String port) {
     ImportTsFileRemotely.port = port;
   }
+
+  public static void setUsername(final String username) {
+    ImportTsFileRemotely.username = username;
+  }
+
+  public static void setPassword(final String password) {
+    ImportTsFileRemotely.password = password;
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index b2967e3bdf8..00a06c926c8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -35,6 +35,8 @@ public class IoTDBConfigNodeSyncClientManager extends 
IoTDBSyncClientManager {
 
   public IoTDBConfigNodeSyncClientManager(
       List<TEndPoint> endPoints,
+      String username,
+      String password,
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
@@ -43,6 +45,8 @@ public class IoTDBConfigNodeSyncClientManager extends 
IoTDBSyncClientManager {
       String loadTsFileStrategy) {
     super(
         endPoints,
+        username,
+        password,
         useSSL,
         trustStorePath,
         trustStorePwd,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index e615616e2ba..7dd90f18dee 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -73,6 +73,8 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
         Boolean.toString(shouldReceiverConvertOnTypeMismatch));
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
 
     return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 307151df706..809daf044c0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -59,6 +59,8 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
   @Override
   protected IoTDBSyncClientManager constructClient(
       final List<TEndPoint> nodeUrls,
+      final String username,
+      final String password,
       final boolean useSSL,
       final String trustStorePath,
       final String trustStorePwd,
@@ -68,6 +70,8 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
       final String loadTsFileStrategy) {
     return new IoTDBConfigNodeSyncClientManager(
         nodeUrls,
+        username,
+        password,
         useSSL,
         Objects.nonNull(trustStorePath) ? 
ConfigNodeConfig.addHomeDir(trustStorePath) : null,
         trustStorePwd,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 549923882d5..1bbf5273187 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -39,6 +39,7 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Base64;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -72,22 +73,32 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
   private final LoadBalancer loadBalancer;
 
-  private final boolean shouldReceiverConvertOnTypeMismatch;
-
-  private final String loadTsFileStrategy;
-
   public IoTDBDataNodeAsyncClientManager(
       List<TEndPoint> endPoints,
+      /* The following parameters are used locally. */
       boolean useLeaderCache,
       String loadBalanceStrategy,
+      /* The following parameters are used to handshake with the receiver. */
+      String username,
+      String password,
       boolean shouldReceiverConvertOnTypeMismatch,
       String loadTsFileStrategy) {
-    super(endPoints, useLeaderCache);
+    super(
+        endPoints,
+        username,
+        password,
+        shouldReceiverConvertOnTypeMismatch,
+        loadTsFileStrategy,
+        useLeaderCache);
 
     endPointSet = new HashSet<>(endPoints);
 
     receiverAttributes =
-        String.format("%s-%s", shouldReceiverConvertOnTypeMismatch, 
loadTsFileStrategy);
+        String.format(
+            "%s-%s-%s",
+            Base64.getEncoder().encodeToString((username + ":" + 
password).getBytes()),
+            shouldReceiverConvertOnTypeMismatch,
+            loadTsFileStrategy);
     synchronized (IoTDBDataNodeAsyncClientManager.class) {
       if 
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
 {
         ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
@@ -118,9 +129,6 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
             loadBalanceStrategy);
         loadBalancer = new RoundRobinLoadBalancer();
     }
-
-    this.shouldReceiverConvertOnTypeMismatch = 
shouldReceiverConvertOnTypeMismatch;
-    this.loadTsFileStrategy = loadTsFileStrategy;
   }
 
   public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
@@ -234,6 +242,8 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
           Boolean.toString(shouldReceiverConvertOnTypeMismatch));
       params.put(
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
+      params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, 
username);
+      params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, 
password);
 
       
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
       
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
 callback);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index 5e4e0fbfcb8..ae3f07b068c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -44,6 +44,8 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
 
   public IoTDBDataNodeSyncClientManager(
       final List<TEndPoint> endPoints,
+      final String username,
+      final String password,
       final boolean useSSL,
       final String trustStorePath,
       final String trustStorePwd,
@@ -53,6 +55,8 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
       final String loadTsFileStrategy) {
     super(
         endPoints,
+        username,
+        password,
         useSSL,
         trustStorePath,
         trustStorePwd,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 33be8e002f4..788244f738c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -105,6 +105,8 @@ public abstract class IoTDBDataNodeAirGapConnector extends 
IoTDBAirGapConnector
         Boolean.toString(shouldReceiverConvertOnTypeMismatch));
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
 
     return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 62a1f040624..ef05f8f30a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -77,6 +77,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
@@ -86,6 +87,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
 
 public class IoTDBLegacyPipeConnector implements PipeConnector {
@@ -193,7 +195,11 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
 
     user =
         parameters.getStringOrDefault(
-            Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
+            Arrays.asList(
+                CONNECTOR_IOTDB_USER_KEY,
+                SINK_IOTDB_USER_KEY,
+                CONNECTOR_IOTDB_USERNAME_KEY,
+                SINK_IOTDB_USERNAME_KEY),
             CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
     password =
         parameters.getStringOrDefault(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index 9ab7a110a6f..cfd898b62c1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
@@ -61,6 +62,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
@@ -113,7 +115,11 @@ public class OpcUaConnector implements PipeConnector {
 
     final String user =
         parameters.getStringOrDefault(
-            Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
+            Arrays.asList(
+                CONNECTOR_IOTDB_USER_KEY,
+                SINK_IOTDB_USER_KEY,
+                CONNECTOR_IOTDB_USERNAME_KEY,
+                SINK_IOTDB_USERNAME_KEY),
             CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
     final String password =
         parameters.getStringOrDefault(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 5aa4324a2f3..f127b7d1734 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -125,6 +125,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
                 Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, 
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
                 CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE),
             loadBalanceStrategy,
+            username,
+            password,
             shouldReceiverConvertOnTypeMismatch,
             loadTsFileStrategy);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index e6f1ac97957..0ae8ae8743a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -83,6 +83,8 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
   @Override
   protected IoTDBSyncClientManager constructClient(
       final List<TEndPoint> nodeUrls,
+      final String username,
+      final String password,
       final boolean useSSL,
       final String trustStorePath,
       final String trustStorePwd,
@@ -93,6 +95,8 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
     clientManager =
         new IoTDBDataNodeSyncClientManager(
             nodeUrls,
+            username,
+            password,
             useSSL,
             Objects.nonNull(trustStorePath) ? 
IoTDBConfig.addDataHomeDir(trustStorePath) : null,
             trustStorePwd,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 9fd3bcd4092..d24e097badf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -27,6 +27,9 @@ import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapOneByteRespo
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
+import org.apache.iotdb.db.protocol.session.ClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -73,6 +76,9 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
 
     LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId, 
socket);
 
+    final ClientSession session = new ClientSession(socket);
+    SessionManager.getInstance().registerSession(session);
+
     try {
       while (!socket.isClosed()) {
         isELanguagePayload = false;
@@ -91,6 +97,8 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
       throw e;
     } finally {
       PipeDataNodeAgent.receiver().thrift().handleClientExit();
+      SessionManager.getInstance()
+          .closeSession(session, 
Coordinator.getInstance()::cleanupQueryExecution);
       socket.close();
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 56789b69a99..7445741ce35 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
@@ -59,8 +60,9 @@ import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
 import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisitor;
 import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
 import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementToBatchVisitor;
+import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp;
+import org.apache.iotdb.db.protocol.session.IClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
-import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
@@ -122,9 +124,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       new PipeStatementExceptionVisitor();
   private static final PipeStatementPatternParseVisitor 
STATEMENT_PATTERN_PARSE_VISITOR =
       new PipeStatementPatternParseVisitor();
-  private static final PipeStatementDataTypeConvertExecutionVisitor
-      STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR =
-          new 
PipeStatementDataTypeConvertExecutionVisitor(IoTDBDataNodeReceiver::executeStatement);
+  private final PipeStatementDataTypeConvertExecutionVisitor
+      statementDataTypeConvertExecutionVisitor =
+          new 
PipeStatementDataTypeConvertExecutionVisitor(this::executeStatement);
   private final PipeStatementToBatchVisitor batchVisitor = new 
PipeStatementToBatchVisitor();
 
   // Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> 
confignode (cluster
@@ -137,6 +139,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
   private final PipeTransferSliceReqHandler sliceReqHandler = new 
PipeTransferSliceReqHandler();
 
+  private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
+
   static {
     try {
       folderManager =
@@ -618,16 +622,48 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             && ((statement instanceof InsertBaseStatement
                     && ((InsertBaseStatement) 
statement).hasFailedMeasurements())
                 || status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode())
-        ? statement.accept(STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR, 
status).orElse(status)
+        ? statement.accept(statementDataTypeConvertExecutionVisitor, 
status).orElse(status)
         : status;
   }
 
-  private static TSStatus executeStatement(final Statement statement) {
+  private TSStatus executeStatement(final Statement statement) {
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (clientSession == null || !clientSession.isLogin()) {
+      final BasicOpenSessionResp openSessionResp =
+          SESSION_MANAGER.login(
+              SESSION_MANAGER.getCurrSession(),
+              username,
+              password,
+              ZoneId.systemDefault().toString(),
+              SessionManager.CURRENT_RPC_VERSION,
+              IoTDBConstant.ClientVersion.V_1_0);
+      if (openSessionResp.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOGGER.warn(
+            "Receiver id = {}: Failed to open session, username = {}, response 
= {}.",
+            receiverId.get(),
+            username,
+            openSessionResp);
+        return RpcUtils.getStatus(openSessionResp.getCode(), 
openSessionResp.getMessage());
+      }
+      clientSession = SESSION_MANAGER.getCurrSession();
+    }
+
+    final TSStatus status = AuthorityChecker.checkAuthority(statement, 
clientSession);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      LOGGER.warn(
+          "Receiver id = {}: Failed to check authority for statement {}, 
username = {}, response = {}.",
+          receiverId.get(),
+          statement.getType().name(),
+          username,
+          status);
+      return RpcUtils.getStatus(status.getCode(), status.getMessage());
+    }
+
     return Coordinator.getInstance()
         .executeForTreeModel(
             new PipeEnrichedStatement(statement),
             SessionManager.getInstance().requestQueryId(),
-            new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault()),
+            SESSION_MANAGER.getSessionInfo(clientSession),
             "",
             ClusterPartitionFetcher.getInstance(),
             ClusterSchemaFetcher.getInstance(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 77eccfb4069..afee3e9a0a2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -75,6 +75,8 @@ public class PipeConnectorConstant {
 
   public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
   public static final String SINK_IOTDB_USER_KEY = "sink.user";
+  public static final String CONNECTOR_IOTDB_USERNAME_KEY = 
"connector.username";
+  public static final String SINK_IOTDB_USERNAME_KEY = "sink.username";
   public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root";
 
   public static final String CONNECTOR_IOTDB_PASSWORD_KEY = 
"connector.password";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index 73e0543fe67..ed3334b2459 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -37,6 +37,12 @@ public abstract class IoTDBClientManager {
   protected final List<TEndPoint> endPointList;
   protected long currentClientIndex = 0;
 
+  protected final String username;
+  protected final String password;
+
+  protected final boolean shouldReceiverConvertOnTypeMismatch;
+  protected final String loadTsFileStrategy;
+
   protected final boolean useLeaderCache;
 
   // This flag indicates whether the receiver supports mods transferring if
@@ -48,8 +54,18 @@ public abstract class IoTDBClientManager {
   protected static final AtomicInteger CONNECTION_TIMEOUT_MS =
       new 
AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
 
-  protected IoTDBClientManager(List<TEndPoint> endPointList, boolean 
useLeaderCache) {
+  protected IoTDBClientManager(
+      List<TEndPoint> endPointList,
+      String username,
+      String password,
+      boolean shouldReceiverConvertOnTypeMismatch,
+      final String loadTsFileStrategy,
+      boolean useLeaderCache) {
     this.endPointList = endPointList;
+    this.username = username;
+    this.password = password;
+    this.shouldReceiverConvertOnTypeMismatch = 
shouldReceiverConvertOnTypeMismatch;
+    this.loadTsFileStrategy = loadTsFileStrategy;
     this.useLeaderCache = useLeaderCache;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 81b5194621c..022e4fc7b53 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -60,12 +60,10 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
 
   private final LoadBalancer loadBalancer;
 
-  private final boolean shouldReceiverConvertOnTypeMismatch;
-
-  private final String loadTsFileStrategy;
-
   protected IoTDBSyncClientManager(
       List<TEndPoint> endPoints,
+      String username,
+      String password,
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
@@ -73,7 +71,13 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       String loadBalanceStrategy,
       boolean shouldReceiverConvertOnTypeMismatch,
       String loadTsFileStrategy) {
-    super(endPoints, useLeaderCache);
+    super(
+        endPoints,
+        username,
+        password,
+        shouldReceiverConvertOnTypeMismatch,
+        loadTsFileStrategy,
+        useLeaderCache);
 
     this.useSSL = useSSL;
     this.trustStorePath = trustStorePath;
@@ -99,9 +103,6 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
             loadBalanceStrategy);
         loadBalancer = new RoundRobinLoadBalancer();
     }
-
-    this.shouldReceiverConvertOnTypeMismatch = 
shouldReceiverConvertOnTypeMismatch;
-    this.loadTsFileStrategy = loadTsFileStrategy;
   }
 
   public void checkClientStatusAndTryReconstructIfNecessary() {
@@ -183,6 +184,8 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
           Boolean.toString(shouldReceiverConvertOnTypeMismatch));
       params.put(
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
+      params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, 
username);
+      params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, 
password);
 
       // Try to handshake by PipeTransferHandshakeV2Req.
       TPipeTransferResp resp = 
client.pipeTransfer(buildHandshakeV2Req(params));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
index ec8818072d2..46bd38ba45a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
@@ -25,6 +25,8 @@ public class PipeTransferHandshakeConstant {
   public static final String HANDSHAKE_KEY_CLUSTER_ID = "clusterID";
   public static final String HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH = 
"convertOnTypeMismatch";
   public static final String HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY = 
"loadTsFileStrategy";
+  public static final String HANDSHAKE_KEY_USERNAME = "username";
+  public static final String HANDSHAKE_KEY_PASSWORD = "password";
 
   private PipeTransferHandshakeConstant() {
     // Utility class
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index db58cec22ca..8c116d70ce9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -78,8 +78,13 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_HOST_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USERNAME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET;
@@ -102,7 +107,10 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_HOST_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USERNAME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
@@ -118,6 +126,9 @@ public abstract class IoTDBConnector implements 
PipeConnector {
 
   protected final List<TEndPoint> nodeUrls = new ArrayList<>();
 
+  protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
+  protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+
   protected String loadBalanceStrategy;
 
   protected String loadTsFileStrategy;
@@ -175,6 +186,19 @@ public abstract class IoTDBConnector implements 
PipeConnector {
             Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
             CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE));
 
+    username =
+        parameters.getStringOrDefault(
+            Arrays.asList(
+                CONNECTOR_IOTDB_USER_KEY,
+                SINK_IOTDB_USER_KEY,
+                CONNECTOR_IOTDB_USERNAME_KEY,
+                SINK_IOTDB_USERNAME_KEY),
+            CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
+    password =
+        parameters.getStringOrDefault(
+            Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, 
SINK_IOTDB_PASSWORD_KEY),
+            CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+
     loadBalanceStrategy =
         parameters
             .getStringOrDefault(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index 6a4dd9e9067..ffdc9f55b18 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -118,6 +118,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
     clientManager =
         constructClient(
             nodeUrls,
+            username,
+            password,
             useSSL,
             trustStorePath,
             trustStorePwd,
@@ -129,6 +131,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
 
   protected abstract IoTDBSyncClientManager constructClient(
       final List<TEndPoint> nodeUrls,
+      final String username,
+      final String password,
       final boolean useSSL,
       final String trustStorePath,
       final String trustStorePwd,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 4b37cf9aabd..59bcc06c45c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -53,6 +53,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
 
 /**
  * {@link IoTDBFileReceiver} is the parent class of receiver on both 
configNode and DataNode,
@@ -67,6 +69,9 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   private static final AtomicLong RECEIVER_ID_GENERATOR = new AtomicLong(0);
   protected final AtomicLong receiverId = new AtomicLong(0);
 
+  protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
+  protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+
   private File writingFile;
   private RandomAccessFile writingFileWriter;
 
@@ -242,6 +247,17 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
               loadTsFileStrategyString));
     }
 
+    final String usernameString =
+        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME);
+    if (usernameString != null) {
+      username = usernameString;
+    }
+    final String passwordString =
+        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD);
+    if (passwordString != null) {
+      password = passwordString;
+    }
+
     // Handle the handshake request as a v1 request.
     // Here we construct a fake "dataNode" request to valid from v1 validation 
logic, though
     // it may not require the actual type of the v1 request.

Reply via email to