This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch validate-user-password-receiver in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b1bce2bd28555e67b5ddae23593bf78271f32de0 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Oct 28 22:32:16 2024 +0800 Pipe: pass username & password to receiver --- .../org/apache/iotdb/tool/tsfile/ImportTsFile.java | 2 ++ .../iotdb/tool/tsfile/ImportTsFileRemotely.java | 14 +++++++++++ .../client/IoTDBConfigNodeSyncClientManager.java | 4 ++++ .../protocol/IoTDBConfigRegionAirGapConnector.java | 2 ++ .../protocol/IoTDBConfigRegionConnector.java | 4 ++++ .../client/IoTDBDataNodeAsyncClientManager.java | 28 +++++++++++++++------- .../client/IoTDBDataNodeSyncClientManager.java | 4 ++++ .../airgap/IoTDBDataNodeAirGapConnector.java | 2 ++ .../async/IoTDBDataRegionAsyncConnector.java | 2 ++ .../thrift/sync/IoTDBDataNodeSyncConnector.java | 2 ++ .../protocol/thrift/IoTDBDataNodeReceiver.java | 1 + .../config/constant/PipeConnectorConstant.java | 6 +++-- .../pipe/connector/client/IoTDBClientManager.java | 18 +++++++++++++- .../connector/client/IoTDBSyncClientManager.java | 19 ++++++++------- .../common/PipeTransferHandshakeConstant.java | 2 ++ .../pipe/connector/protocol/IoTDBConnector.java | 18 ++++++++++++++ .../connector/protocol/IoTDBSslSyncConnector.java | 4 ++++ .../commons/pipe/receiver/IoTDBFileReceiver.java | 16 +++++++++++++ 18 files changed, 128 insertions(+), 20 deletions(-) diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java index 497900a0371..102e9ba5f0f 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java @@ -332,6 +332,8 @@ public class ImportTsFile extends AbstractTsFileTool { // ImportTsFileRemotely ImportTsFileRemotely.setHost(host); ImportTsFileRemotely.setPort(port); + ImportTsFileRemotely.setUsername(username); + ImportTsFileRemotely.setPassword(password); // ImportTsFileBase ImportTsFileBase.setSuccessAndFailDirAndOperation( diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java index 8f3c22f9e0b..115ebb730ee 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq; +import org.apache.iotdb.isession.SessionConfig; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; @@ -72,6 +73,9 @@ public class ImportTsFileRemotely extends ImportTsFileBase { private static String host; private static String port; + private static String username = SessionConfig.DEFAULT_USER; + private static String password = SessionConfig.DEFAULT_PASSWORD; + public ImportTsFileRemotely() { initClient(); sendHandshake(); @@ -186,6 +190,8 @@ public class ImportTsFileRemotely extends ImportTsFileBase { PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH, Boolean.toString(true)); params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, LOAD_STRATEGY); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password); return params; } @@ -335,4 +341,12 @@ public class ImportTsFileRemotely extends ImportTsFileBase { public static void setPort(final String port) { ImportTsFileRemotely.port = port; } + + public static void setUsername(final String username) { + ImportTsFileRemotely.username = username; + } + + public static void setPassword(final String password) { + ImportTsFileRemotely.password = password; + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java index b2967e3bdf8..00a06c926c8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java @@ -35,6 +35,8 @@ public class IoTDBConfigNodeSyncClientManager extends IoTDBSyncClientManager { public IoTDBConfigNodeSyncClientManager( List<TEndPoint> endPoints, + String username, + String password, boolean useSSL, String trustStorePath, String trustStorePwd, @@ -43,6 +45,8 @@ public class IoTDBConfigNodeSyncClientManager extends IoTDBSyncClientManager { String loadTsFileStrategy) { super( endPoints, + username, + password, useSSL, trustStorePath, trustStorePwd, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java index e615616e2ba..7dd90f18dee 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java @@ -73,6 +73,8 @@ public class IoTDBConfigRegionAirGapConnector extends IoTDBAirGapConnector { Boolean.toString(shouldReceiverConvertOnTypeMismatch)); params.put( PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password); return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java index 307151df706..809daf044c0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java @@ -59,6 +59,8 @@ public class IoTDBConfigRegionConnector extends IoTDBSslSyncConnector { @Override protected IoTDBSyncClientManager constructClient( final List<TEndPoint> nodeUrls, + final String username, + final String password, final boolean useSSL, final String trustStorePath, final String trustStorePwd, @@ -68,6 +70,8 @@ public class IoTDBConfigRegionConnector extends IoTDBSslSyncConnector { final String loadTsFileStrategy) { return new IoTDBConfigNodeSyncClientManager( nodeUrls, + username, + password, useSSL, Objects.nonNull(trustStorePath) ? ConfigNodeConfig.addHomeDir(trustStorePath) : null, trustStorePwd, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java index 549923882d5..1bbf5273187 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java @@ -39,6 +39,7 @@ import org.apache.thrift.async.AsyncMethodCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Base64; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -72,22 +73,32 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager private final LoadBalancer loadBalancer; - private final boolean shouldReceiverConvertOnTypeMismatch; - - private final String loadTsFileStrategy; - public IoTDBDataNodeAsyncClientManager( List<TEndPoint> endPoints, + /* The following parameters are used locally. */ boolean useLeaderCache, String loadBalanceStrategy, + /* The following parameters are used to handshake with the receiver. */ + String username, + String password, boolean shouldReceiverConvertOnTypeMismatch, String loadTsFileStrategy) { - super(endPoints, useLeaderCache); + super( + endPoints, + username, + password, + shouldReceiverConvertOnTypeMismatch, + loadTsFileStrategy, + useLeaderCache); endPointSet = new HashSet<>(endPoints); receiverAttributes = - String.format("%s-%s", shouldReceiverConvertOnTypeMismatch, loadTsFileStrategy); + String.format( + "%s-%s-%s", + Base64.getEncoder().encodeToString((username + ":" + password).getBytes()), + shouldReceiverConvertOnTypeMismatch, + loadTsFileStrategy); synchronized (IoTDBDataNodeAsyncClientManager.class) { if (!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes)) { ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent( @@ -118,9 +129,6 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager loadBalanceStrategy); loadBalancer = new RoundRobinLoadBalancer(); } - - this.shouldReceiverConvertOnTypeMismatch = shouldReceiverConvertOnTypeMismatch; - this.loadTsFileStrategy = loadTsFileStrategy; } public AsyncPipeDataTransferServiceClient borrowClient() throws Exception { @@ -234,6 +242,8 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager Boolean.toString(shouldReceiverConvertOnTypeMismatch)); params.put( PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password); client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs()); client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params), callback); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java index 5e4e0fbfcb8..ae3f07b068c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java @@ -44,6 +44,8 @@ public class IoTDBDataNodeSyncClientManager extends IoTDBSyncClientManager public IoTDBDataNodeSyncClientManager( final List<TEndPoint> endPoints, + final String username, + final String password, final boolean useSSL, final String trustStorePath, final String trustStorePwd, @@ -53,6 +55,8 @@ public class IoTDBDataNodeSyncClientManager extends IoTDBSyncClientManager final String loadTsFileStrategy) { super( endPoints, + username, + password, useSSL, trustStorePath, trustStorePwd, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java index 33be8e002f4..788244f738c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java @@ -105,6 +105,8 @@ public abstract class IoTDBDataNodeAirGapConnector extends IoTDBAirGapConnector Boolean.toString(shouldReceiverConvertOnTypeMismatch)); params.put( PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password); return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 5aa4324a2f3..f127b7d1734 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -125,6 +125,8 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, CONNECTOR_LEADER_CACHE_ENABLE_KEY), CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE), loadBalanceStrategy, + username, + password, shouldReceiverConvertOnTypeMismatch, loadTsFileStrategy); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java index e6f1ac97957..6c67b25a08b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java @@ -93,6 +93,8 @@ public abstract class IoTDBDataNodeSyncConnector extends IoTDBSslSyncConnector { clientManager = new IoTDBDataNodeSyncClientManager( nodeUrls, + username, + password, useSSL, Objects.nonNull(trustStorePath) ? IoTDBConfig.addDataHomeDir(trustStorePath) : null, trustStorePwd, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 56789b69a99..e8ffce63f56 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -623,6 +623,7 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { } private static TSStatus executeStatement(final Statement statement) { + // user name & password return Coordinator.getInstance() .executeForTreeModel( new PipeEnrichedStatement(statement), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java index 77eccfb4069..89b5ddfada1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java @@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.config.constant; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.isession.SessionConfig; import com.github.luben.zstd.Zstd; @@ -75,11 +76,12 @@ public class PipeConnectorConstant { public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user"; public static final String SINK_IOTDB_USER_KEY = "sink.user"; - public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root"; + public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = SessionConfig.DEFAULT_USER; public static final String CONNECTOR_IOTDB_PASSWORD_KEY = "connector.password"; public static final String SINK_IOTDB_PASSWORD_KEY = "sink.password"; - public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root"; + public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = + SessionConfig.DEFAULT_PASSWORD; public static final String CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY = "connector.exception.data.convert-on-type-mismatch"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java index 73e0543fe67..ed3334b2459 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java @@ -37,6 +37,12 @@ public abstract class IoTDBClientManager { protected final List<TEndPoint> endPointList; protected long currentClientIndex = 0; + protected final String username; + protected final String password; + + protected final boolean shouldReceiverConvertOnTypeMismatch; + protected final String loadTsFileStrategy; + protected final boolean useLeaderCache; // This flag indicates whether the receiver supports mods transferring if @@ -48,8 +54,18 @@ public abstract class IoTDBClientManager { protected static final AtomicInteger CONNECTION_TIMEOUT_MS = new AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs()); - protected IoTDBClientManager(List<TEndPoint> endPointList, boolean useLeaderCache) { + protected IoTDBClientManager( + List<TEndPoint> endPointList, + String username, + String password, + boolean shouldReceiverConvertOnTypeMismatch, + final String loadTsFileStrategy, + boolean useLeaderCache) { this.endPointList = endPointList; + this.username = username; + this.password = password; + this.shouldReceiverConvertOnTypeMismatch = shouldReceiverConvertOnTypeMismatch; + this.loadTsFileStrategy = loadTsFileStrategy; this.useLeaderCache = useLeaderCache; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java index 81b5194621c..022e4fc7b53 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java @@ -60,12 +60,10 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen private final LoadBalancer loadBalancer; - private final boolean shouldReceiverConvertOnTypeMismatch; - - private final String loadTsFileStrategy; - protected IoTDBSyncClientManager( List<TEndPoint> endPoints, + String username, + String password, boolean useSSL, String trustStorePath, String trustStorePwd, @@ -73,7 +71,13 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen String loadBalanceStrategy, boolean shouldReceiverConvertOnTypeMismatch, String loadTsFileStrategy) { - super(endPoints, useLeaderCache); + super( + endPoints, + username, + password, + shouldReceiverConvertOnTypeMismatch, + loadTsFileStrategy, + useLeaderCache); this.useSSL = useSSL; this.trustStorePath = trustStorePath; @@ -99,9 +103,6 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen loadBalanceStrategy); loadBalancer = new RoundRobinLoadBalancer(); } - - this.shouldReceiverConvertOnTypeMismatch = shouldReceiverConvertOnTypeMismatch; - this.loadTsFileStrategy = loadTsFileStrategy; } public void checkClientStatusAndTryReconstructIfNecessary() { @@ -183,6 +184,8 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen Boolean.toString(shouldReceiverConvertOnTypeMismatch)); params.put( PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password); // Try to handshake by PipeTransferHandshakeV2Req. TPipeTransferResp resp = client.pipeTransfer(buildHandshakeV2Req(params)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java index ec8818072d2..46bd38ba45a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java @@ -25,6 +25,8 @@ public class PipeTransferHandshakeConstant { public static final String HANDSHAKE_KEY_CLUSTER_ID = "clusterID"; public static final String HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH = "convertOnTypeMismatch"; public static final String HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY = "loadTsFileStrategy"; + public static final String HANDSHAKE_KEY_USERNAME = "username"; + public static final String HANDSHAKE_KEY_PASSWORD = "password"; private PipeTransferHandshakeConstant() { // Utility class diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index db58cec22ca..f76139b36e0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -78,8 +78,12 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_HOST_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET; @@ -102,7 +106,9 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_HOST_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY; @@ -118,6 +124,9 @@ public abstract class IoTDBConnector implements PipeConnector { protected final List<TEndPoint> nodeUrls = new ArrayList<>(); + protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE; + protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; + protected String loadBalanceStrategy; protected String loadTsFileStrategy; @@ -175,6 +184,15 @@ public abstract class IoTDBConnector implements PipeConnector { Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY), CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE)); + username = + parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY), + CONNECTOR_IOTDB_USER_DEFAULT_VALUE); + password = + parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, SINK_IOTDB_PASSWORD_KEY), + CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE); + loadBalanceStrategy = parameters .getStringOrDefault( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java index 6a4dd9e9067..ffdc9f55b18 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java @@ -118,6 +118,8 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { clientManager = constructClient( nodeUrls, + username, + password, useSSL, trustStorePath, trustStorePwd, @@ -129,6 +131,8 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { protected abstract IoTDBSyncClientManager constructClient( final List<TEndPoint> nodeUrls, + final String username, + final String password, final boolean useSSL, final String trustStorePath, final String trustStorePwd, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index 4b37cf9aabd..59bcc06c45c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -53,6 +53,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; /** * {@link IoTDBFileReceiver} is the parent class of receiver on both configNode and DataNode, @@ -67,6 +69,9 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { private static final AtomicLong RECEIVER_ID_GENERATOR = new AtomicLong(0); protected final AtomicLong receiverId = new AtomicLong(0); + protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE; + protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; + private File writingFile; private RandomAccessFile writingFileWriter; @@ -242,6 +247,17 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { loadTsFileStrategyString)); } + final String usernameString = + req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME); + if (usernameString != null) { + username = usernameString; + } + final String passwordString = + req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD); + if (passwordString != null) { + password = passwordString; + } + // Handle the handshake request as a v1 request. // Here we construct a fake "dataNode" request to valid from v1 validation logic, though // it may not require the actual type of the v1 request.
