This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new fd0940ab670 Pipe: Support specifing receiver's username and password
when syncing data between clusters (#13933)
fd0940ab670 is described below
commit fd0940ab67044dbd3de71237ce003cd29a90d4c9
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Oct 29 03:20:56 2024 +0800
Pipe: Support specifing receiver's username and password when syncing data
between clusters (#13933)
---
.../pipe/it/manual/IoTDBPipePermissionIT.java | 164 +++++++++++++++++++++
.../api/customizer/parameter/PipeParameters.java | 1 +
.../org/apache/iotdb/tool/tsfile/ImportTsFile.java | 2 +
.../iotdb/tool/tsfile/ImportTsFileRemotely.java | 14 ++
.../client/IoTDBConfigNodeSyncClientManager.java | 4 +
.../protocol/IoTDBConfigRegionAirGapConnector.java | 2 +
.../protocol/IoTDBConfigRegionConnector.java | 4 +
.../client/IoTDBDataNodeAsyncClientManager.java | 28 ++--
.../client/IoTDBDataNodeSyncClientManager.java | 4 +
.../airgap/IoTDBDataNodeAirGapConnector.java | 2 +
.../protocol/legacy/IoTDBLegacyPipeConnector.java | 8 +-
.../connector/protocol/opcua/OpcUaConnector.java | 8 +-
.../async/IoTDBDataRegionAsyncConnector.java | 2 +
.../thrift/sync/IoTDBDataNodeSyncConnector.java | 4 +
.../protocol/airgap/IoTDBAirGapReceiver.java | 8 +
.../protocol/thrift/IoTDBDataNodeReceiver.java | 50 ++++++-
.../config/constant/PipeConnectorConstant.java | 2 +
.../pipe/connector/client/IoTDBClientManager.java | 18 ++-
.../connector/client/IoTDBSyncClientManager.java | 19 ++-
.../common/PipeTransferHandshakeConstant.java | 2 +
.../pipe/connector/protocol/IoTDBConnector.java | 24 +++
.../connector/protocol/IoTDBSslSyncConnector.java | 4 +
.../commons/pipe/receiver/IoTDBFileReceiver.java | 16 ++
23 files changed, 363 insertions(+), 27 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
new file mode 100644
index 00000000000..a7e935a7548
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.manual;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2ManualCreateSchema.class})
+public class IoTDBPipePermissionIT extends AbstractPipeDualManualIT {
+ @Override
+ @Before
+ public void setUp() {
+ MultiEnvFactory.createEnv(2);
+ senderEnv = MultiEnvFactory.getEnv(0);
+ receiverEnv = MultiEnvFactory.getEnv(1);
+
+ // TODO: delete ratis configurations
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(false)
+ .setDefaultSchemaRegionGroupNumPerDatabase(1)
+ .setTimestampPrecision("ms")
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(false)
+ .setTimestampPrecision("ms")
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setSchemaReplicationFactor(3)
+ .setDataReplicationFactor(2);
+
+ // 10 min, assert that the operations will not time out
+ senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
+ receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
+
+ senderEnv.initClusterEnvironment();
+ receiverEnv.initClusterEnvironment(3, 3);
+ }
+
+ @Test
+ public void testWithSyncConnector() throws Exception {
+ testWithConnector("iotdb-thrift-sync-connector");
+ }
+
+ @Test
+ public void testWithAsyncConnector() throws Exception {
+ testWithConnector("iotdb-thrift-async-connector");
+ }
+
+ private void testWithConnector(final String connector) throws Exception {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ receiverEnv,
+ Arrays.asList(
+ "create user `thulab` 'passwd'",
+ "create role `admin`",
+ "grant role `admin` to `thulab`",
+ "grant WRITE, READ, MANAGE_DATABASE on root.** to role `admin`")))
{
+ return;
+ }
+
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "create timeseries root.ln.wf02.wt01.temperature with
datatype=INT64,encoding=PLAIN",
+ "create timeseries root.ln.wf02.wt01.status with
datatype=BOOLEAN,encoding=PLAIN",
+ "insert into root.ln.wf02.wt01(time, temperature, status) values
(1800000000000, 23, true)"))) {
+ fail();
+ return;
+ }
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.inclusion", "all");
+
+ connectorAttributes.put("connector", connector);
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ connectorAttributes.put("connector.username", "thulab");
+ connectorAttributes.put("connector.password", "passwd");
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+
+ final Set<String> expectedResSet = new HashSet<>();
+ expectedResSet.add(
+
"root.ln.wf02.wt01.temperature,null,root.ln,INT64,PLAIN,LZ4,null,null,null,null,BASE,");
+ expectedResSet.add(
+
"root.ln.wf02.wt01.status,null,root.ln,BOOLEAN,PLAIN,LZ4,null,null,null,null,BASE,");
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "show timeseries",
+
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+ expectedResSet);
+ expectedResSet.clear();
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.**",
+ "Time,root.ln.wf02.wt01.temperature,root.ln.wf02.wt01.status,",
+ Collections.singleton("1800000000000,23,true,"));
+ }
+ }
+}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 4d0f75790dc..3dcb2d19b0a 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -383,6 +383,7 @@ public class PipeParameters {
static {
KEYS.add("ssl.trust-store-pwd");
+ KEYS.add("password");
}
static String hide(final String key, final String value) {
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
index 497900a0371..102e9ba5f0f 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
@@ -332,6 +332,8 @@ public class ImportTsFile extends AbstractTsFileTool {
// ImportTsFileRemotely
ImportTsFileRemotely.setHost(host);
ImportTsFileRemotely.setPort(port);
+ ImportTsFileRemotely.setUsername(username);
+ ImportTsFileRemotely.setPassword(password);
// ImportTsFileBase
ImportTsFileBase.setSuccessAndFailDirAndOperation(
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
index 8f3c22f9e0b..115ebb730ee 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -72,6 +73,9 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
private static String host;
private static String port;
+ private static String username = SessionConfig.DEFAULT_USER;
+ private static String password = SessionConfig.DEFAULT_PASSWORD;
+
public ImportTsFileRemotely() {
initClient();
sendHandshake();
@@ -186,6 +190,8 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
Boolean.toString(true));
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
LOAD_STRATEGY);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
return params;
}
@@ -335,4 +341,12 @@ public class ImportTsFileRemotely extends ImportTsFileBase
{
public static void setPort(final String port) {
ImportTsFileRemotely.port = port;
}
+
+ public static void setUsername(final String username) {
+ ImportTsFileRemotely.username = username;
+ }
+
+ public static void setPassword(final String password) {
+ ImportTsFileRemotely.password = password;
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index b2967e3bdf8..00a06c926c8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -35,6 +35,8 @@ public class IoTDBConfigNodeSyncClientManager extends
IoTDBSyncClientManager {
public IoTDBConfigNodeSyncClientManager(
List<TEndPoint> endPoints,
+ String username,
+ String password,
boolean useSSL,
String trustStorePath,
String trustStorePwd,
@@ -43,6 +45,8 @@ public class IoTDBConfigNodeSyncClientManager extends
IoTDBSyncClientManager {
String loadTsFileStrategy) {
super(
endPoints,
+ username,
+ password,
useSSL,
trustStorePath,
trustStorePwd,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index e615616e2ba..7dd90f18dee 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -73,6 +73,8 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
Boolean.toString(shouldReceiverConvertOnTypeMismatch));
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
loadTsFileStrategy);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 307151df706..809daf044c0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -59,6 +59,8 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
@Override
protected IoTDBSyncClientManager constructClient(
final List<TEndPoint> nodeUrls,
+ final String username,
+ final String password,
final boolean useSSL,
final String trustStorePath,
final String trustStorePwd,
@@ -68,6 +70,8 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
final String loadTsFileStrategy) {
return new IoTDBConfigNodeSyncClientManager(
nodeUrls,
+ username,
+ password,
useSSL,
Objects.nonNull(trustStorePath) ?
ConfigNodeConfig.addHomeDir(trustStorePath) : null,
trustStorePwd,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 549923882d5..1bbf5273187 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -39,6 +39,7 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -72,22 +73,32 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
private final LoadBalancer loadBalancer;
- private final boolean shouldReceiverConvertOnTypeMismatch;
-
- private final String loadTsFileStrategy;
-
public IoTDBDataNodeAsyncClientManager(
List<TEndPoint> endPoints,
+ /* The following parameters are used locally. */
boolean useLeaderCache,
String loadBalanceStrategy,
+ /* The following parameters are used to handshake with the receiver. */
+ String username,
+ String password,
boolean shouldReceiverConvertOnTypeMismatch,
String loadTsFileStrategy) {
- super(endPoints, useLeaderCache);
+ super(
+ endPoints,
+ username,
+ password,
+ shouldReceiverConvertOnTypeMismatch,
+ loadTsFileStrategy,
+ useLeaderCache);
endPointSet = new HashSet<>(endPoints);
receiverAttributes =
- String.format("%s-%s", shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy);
+ String.format(
+ "%s-%s-%s",
+ Base64.getEncoder().encodeToString((username + ":" +
password).getBytes()),
+ shouldReceiverConvertOnTypeMismatch,
+ loadTsFileStrategy);
synchronized (IoTDBDataNodeAsyncClientManager.class) {
if
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
{
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
@@ -118,9 +129,6 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
loadBalanceStrategy);
loadBalancer = new RoundRobinLoadBalancer();
}
-
- this.shouldReceiverConvertOnTypeMismatch =
shouldReceiverConvertOnTypeMismatch;
- this.loadTsFileStrategy = loadTsFileStrategy;
}
public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
@@ -234,6 +242,8 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
Boolean.toString(shouldReceiverConvertOnTypeMismatch));
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
loadTsFileStrategy);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME,
username);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD,
password);
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
callback);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index 5e4e0fbfcb8..ae3f07b068c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -44,6 +44,8 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
public IoTDBDataNodeSyncClientManager(
final List<TEndPoint> endPoints,
+ final String username,
+ final String password,
final boolean useSSL,
final String trustStorePath,
final String trustStorePwd,
@@ -53,6 +55,8 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
final String loadTsFileStrategy) {
super(
endPoints,
+ username,
+ password,
useSSL,
trustStorePath,
trustStorePwd,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 33be8e002f4..788244f738c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -105,6 +105,8 @@ public abstract class IoTDBDataNodeAirGapConnector extends
IoTDBAirGapConnector
Boolean.toString(shouldReceiverConvertOnTypeMismatch));
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
loadTsFileStrategy);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 62a1f040624..ef05f8f30a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -77,6 +77,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USERNAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
@@ -86,6 +87,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USERNAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
public class IoTDBLegacyPipeConnector implements PipeConnector {
@@ -193,7 +195,11 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
user =
parameters.getStringOrDefault(
- Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
+ Arrays.asList(
+ CONNECTOR_IOTDB_USER_KEY,
+ SINK_IOTDB_USER_KEY,
+ CONNECTOR_IOTDB_USERNAME_KEY,
+ SINK_IOTDB_USERNAME_KEY),
CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
password =
parameters.getStringOrDefault(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index 9ab7a110a6f..cfd898b62c1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USERNAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
@@ -61,6 +62,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USERNAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
@@ -113,7 +115,11 @@ public class OpcUaConnector implements PipeConnector {
final String user =
parameters.getStringOrDefault(
- Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
+ Arrays.asList(
+ CONNECTOR_IOTDB_USER_KEY,
+ SINK_IOTDB_USER_KEY,
+ CONNECTOR_IOTDB_USERNAME_KEY,
+ SINK_IOTDB_USERNAME_KEY),
CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
final String password =
parameters.getStringOrDefault(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 5aa4324a2f3..f127b7d1734 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -125,6 +125,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY,
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE),
loadBalanceStrategy,
+ username,
+ password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index e6f1ac97957..0ae8ae8743a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -83,6 +83,8 @@ public abstract class IoTDBDataNodeSyncConnector extends
IoTDBSslSyncConnector {
@Override
protected IoTDBSyncClientManager constructClient(
final List<TEndPoint> nodeUrls,
+ final String username,
+ final String password,
final boolean useSSL,
final String trustStorePath,
final String trustStorePwd,
@@ -93,6 +95,8 @@ public abstract class IoTDBDataNodeSyncConnector extends
IoTDBSslSyncConnector {
clientManager =
new IoTDBDataNodeSyncClientManager(
nodeUrls,
+ username,
+ password,
useSSL,
Objects.nonNull(trustStorePath) ?
IoTDBConfig.addDataHomeDir(trustStorePath) : null,
trustStorePwd,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 9fd3bcd4092..d24e097badf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -27,6 +27,9 @@ import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapOneByteRespo
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import
org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
+import org.apache.iotdb.db.protocol.session.ClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -73,6 +76,9 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId,
socket);
+ final ClientSession session = new ClientSession(socket);
+ SessionManager.getInstance().registerSession(session);
+
try {
while (!socket.isClosed()) {
isELanguagePayload = false;
@@ -91,6 +97,8 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
throw e;
} finally {
PipeDataNodeAgent.receiver().thrift().handleClientExit();
+ SessionManager.getInstance()
+ .closeSession(session,
Coordinator.getInstance()::cleanupQueryExecution);
socket.close();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 56789b69a99..7445741ce35 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
@@ -59,8 +60,9 @@ import
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
import
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementToBatchVisitor;
+import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp;
+import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
-import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
@@ -122,9 +124,9 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
new PipeStatementExceptionVisitor();
private static final PipeStatementPatternParseVisitor
STATEMENT_PATTERN_PARSE_VISITOR =
new PipeStatementPatternParseVisitor();
- private static final PipeStatementDataTypeConvertExecutionVisitor
- STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR =
- new
PipeStatementDataTypeConvertExecutionVisitor(IoTDBDataNodeReceiver::executeStatement);
+ private final PipeStatementDataTypeConvertExecutionVisitor
+ statementDataTypeConvertExecutionVisitor =
+ new
PipeStatementDataTypeConvertExecutionVisitor(this::executeStatement);
private final PipeStatementToBatchVisitor batchVisitor = new
PipeStatementToBatchVisitor();
// Used for data transfer: confignode (cluster A) -> datanode (cluster B) ->
confignode (cluster
@@ -137,6 +139,8 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private final PipeTransferSliceReqHandler sliceReqHandler = new
PipeTransferSliceReqHandler();
+ private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
+
static {
try {
folderManager =
@@ -618,16 +622,48 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
&& ((statement instanceof InsertBaseStatement
&& ((InsertBaseStatement)
statement).hasFailedMeasurements())
|| status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode())
- ? statement.accept(STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR,
status).orElse(status)
+ ? statement.accept(statementDataTypeConvertExecutionVisitor,
status).orElse(status)
: status;
}
- private static TSStatus executeStatement(final Statement statement) {
+ private TSStatus executeStatement(final Statement statement) {
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ if (clientSession == null || !clientSession.isLogin()) {
+ final BasicOpenSessionResp openSessionResp =
+ SESSION_MANAGER.login(
+ SESSION_MANAGER.getCurrSession(),
+ username,
+ password,
+ ZoneId.systemDefault().toString(),
+ SessionManager.CURRENT_RPC_VERSION,
+ IoTDBConstant.ClientVersion.V_1_0);
+ if (openSessionResp.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "Receiver id = {}: Failed to open session, username = {}, response
= {}.",
+ receiverId.get(),
+ username,
+ openSessionResp);
+ return RpcUtils.getStatus(openSessionResp.getCode(),
openSessionResp.getMessage());
+ }
+ clientSession = SESSION_MANAGER.getCurrSession();
+ }
+
+ final TSStatus status = AuthorityChecker.checkAuthority(statement,
clientSession);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "Receiver id = {}: Failed to check authority for statement {},
username = {}, response = {}.",
+ receiverId.get(),
+ statement.getType().name(),
+ username,
+ status);
+ return RpcUtils.getStatus(status.getCode(), status.getMessage());
+ }
+
return Coordinator.getInstance()
.executeForTreeModel(
new PipeEnrichedStatement(statement),
SessionManager.getInstance().requestQueryId(),
- new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault()),
+ SESSION_MANAGER.getSessionInfo(clientSession),
"",
ClusterPartitionFetcher.getInstance(),
ClusterSchemaFetcher.getInstance(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 77eccfb4069..afee3e9a0a2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -75,6 +75,8 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
public static final String SINK_IOTDB_USER_KEY = "sink.user";
+ public static final String CONNECTOR_IOTDB_USERNAME_KEY =
"connector.username";
+ public static final String SINK_IOTDB_USERNAME_KEY = "sink.username";
public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root";
public static final String CONNECTOR_IOTDB_PASSWORD_KEY =
"connector.password";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index 73e0543fe67..ed3334b2459 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -37,6 +37,12 @@ public abstract class IoTDBClientManager {
protected final List<TEndPoint> endPointList;
protected long currentClientIndex = 0;
+ protected final String username;
+ protected final String password;
+
+ protected final boolean shouldReceiverConvertOnTypeMismatch;
+ protected final String loadTsFileStrategy;
+
protected final boolean useLeaderCache;
// This flag indicates whether the receiver supports mods transferring if
@@ -48,8 +54,18 @@ public abstract class IoTDBClientManager {
protected static final AtomicInteger CONNECTION_TIMEOUT_MS =
new
AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
- protected IoTDBClientManager(List<TEndPoint> endPointList, boolean
useLeaderCache) {
+ protected IoTDBClientManager(
+ List<TEndPoint> endPointList,
+ String username,
+ String password,
+ boolean shouldReceiverConvertOnTypeMismatch,
+ final String loadTsFileStrategy,
+ boolean useLeaderCache) {
this.endPointList = endPointList;
+ this.username = username;
+ this.password = password;
+ this.shouldReceiverConvertOnTypeMismatch =
shouldReceiverConvertOnTypeMismatch;
+ this.loadTsFileStrategy = loadTsFileStrategy;
this.useLeaderCache = useLeaderCache;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 81b5194621c..022e4fc7b53 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -60,12 +60,10 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
private final LoadBalancer loadBalancer;
- private final boolean shouldReceiverConvertOnTypeMismatch;
-
- private final String loadTsFileStrategy;
-
protected IoTDBSyncClientManager(
List<TEndPoint> endPoints,
+ String username,
+ String password,
boolean useSSL,
String trustStorePath,
String trustStorePwd,
@@ -73,7 +71,13 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
String loadBalanceStrategy,
boolean shouldReceiverConvertOnTypeMismatch,
String loadTsFileStrategy) {
- super(endPoints, useLeaderCache);
+ super(
+ endPoints,
+ username,
+ password,
+ shouldReceiverConvertOnTypeMismatch,
+ loadTsFileStrategy,
+ useLeaderCache);
this.useSSL = useSSL;
this.trustStorePath = trustStorePath;
@@ -99,9 +103,6 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
loadBalanceStrategy);
loadBalancer = new RoundRobinLoadBalancer();
}
-
- this.shouldReceiverConvertOnTypeMismatch =
shouldReceiverConvertOnTypeMismatch;
- this.loadTsFileStrategy = loadTsFileStrategy;
}
public void checkClientStatusAndTryReconstructIfNecessary() {
@@ -183,6 +184,8 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
Boolean.toString(shouldReceiverConvertOnTypeMismatch));
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
loadTsFileStrategy);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME,
username);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD,
password);
// Try to handshake by PipeTransferHandshakeV2Req.
TPipeTransferResp resp =
client.pipeTransfer(buildHandshakeV2Req(params));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
index ec8818072d2..46bd38ba45a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
@@ -25,6 +25,8 @@ public class PipeTransferHandshakeConstant {
public static final String HANDSHAKE_KEY_CLUSTER_ID = "clusterID";
public static final String HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH =
"convertOnTypeMismatch";
public static final String HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY =
"loadTsFileStrategy";
+ public static final String HANDSHAKE_KEY_USERNAME = "username";
+ public static final String HANDSHAKE_KEY_PASSWORD = "password";
private PipeTransferHandshakeConstant() {
// Utility class
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index db58cec22ca..8c116d70ce9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -78,8 +78,13 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_HOST_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USERNAME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET;
@@ -102,7 +107,10 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_HOST_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USERNAME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
@@ -118,6 +126,9 @@ public abstract class IoTDBConnector implements
PipeConnector {
protected final List<TEndPoint> nodeUrls = new ArrayList<>();
+ protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
+ protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+
protected String loadBalanceStrategy;
protected String loadTsFileStrategy;
@@ -175,6 +186,19 @@ public abstract class IoTDBConnector implements
PipeConnector {
Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE));
+ username =
+ parameters.getStringOrDefault(
+ Arrays.asList(
+ CONNECTOR_IOTDB_USER_KEY,
+ SINK_IOTDB_USER_KEY,
+ CONNECTOR_IOTDB_USERNAME_KEY,
+ SINK_IOTDB_USERNAME_KEY),
+ CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
+ password =
+ parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY,
SINK_IOTDB_PASSWORD_KEY),
+ CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+
loadBalanceStrategy =
parameters
.getStringOrDefault(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index 6a4dd9e9067..ffdc9f55b18 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -118,6 +118,8 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
clientManager =
constructClient(
nodeUrls,
+ username,
+ password,
useSSL,
trustStorePath,
trustStorePwd,
@@ -129,6 +131,8 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
protected abstract IoTDBSyncClientManager constructClient(
final List<TEndPoint> nodeUrls,
+ final String username,
+ final String password,
final boolean useSSL,
final String trustStorePath,
final String trustStorePwd,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 4b37cf9aabd..59bcc06c45c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -53,6 +53,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
/**
* {@link IoTDBFileReceiver} is the parent class of receiver on both
configNode and DataNode,
@@ -67,6 +69,9 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
private static final AtomicLong RECEIVER_ID_GENERATOR = new AtomicLong(0);
protected final AtomicLong receiverId = new AtomicLong(0);
+ protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
+ protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+
private File writingFile;
private RandomAccessFile writingFileWriter;
@@ -242,6 +247,17 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
loadTsFileStrategyString));
}
+ final String usernameString =
+
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME);
+ if (usernameString != null) {
+ username = usernameString;
+ }
+ final String passwordString =
+
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD);
+ if (passwordString != null) {
+ password = passwordString;
+ }
+
// Handle the handshake request as a v1 request.
// Here we construct a fake "dataNode" request to valid from v1 validation
logic, though
// it may not require the actual type of the v1 request.