This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 7a01f418de5 Pipe: Add login logic to receiver handshake (#14401)
(#14440)
7a01f418de5 is described below
commit 7a01f418de586a6bb60c52c443f6e502fbd57632
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Dec 16 14:21:44 2024 +0800
Pipe: Add login logic to receiver handshake (#14401) (#14440)
* Pipe: Add login logic to receiver handshake (#14401)
Co-authored-by: Steve Yurong Su <[email protected]>
(cherry picked from commit edc318540ffff1a7940f919b4e2c49ce401c61d1)
---
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 12 +++++++
.../protocol/airgap/IoTDBAirGapReceiver.java | 7 ++--
.../protocol/thrift/IoTDBDataNodeReceiver.java | 25 ++++++++++++++
.../commons/pipe/receiver/IoTDBFileReceiver.java | 39 ++++++++++++++++------
4 files changed, 67 insertions(+), 16 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index fe8b383b56f..4293ecfd68a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.schema.ttl.TTLCache;
+import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
@@ -328,6 +329,12 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
return
ConfigNode.getInstance().getConfigManager().getClusterManager().getClusterId();
}
+ @Override
+ protected TSStatus tryLogin() {
+ // Do nothing. Login check will be done in the data node receiver.
+ return StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
@Override
protected String getReceiverFileBaseDir() {
return
ConfigNodeDescriptor.getInstance().getConf().getPipeReceiverFileDir();
@@ -383,4 +390,9 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
}
return PipeReceiverStatusHandler.getPriorStatus(results);
}
+
+ @Override
+ protected void closeSession() {
+ // Do nothing. The session will be closed in the data node receiver.
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 96b07b4a678..1d8424a1270 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import
org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
import org.apache.iotdb.db.protocol.session.ClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
-import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -76,8 +75,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId,
socket);
- final ClientSession session = new ClientSession(socket);
- SessionManager.getInstance().registerSession(session);
+ SessionManager.getInstance().registerSession(new ClientSession(socket));
try {
while (!socket.isClosed()) {
@@ -96,9 +94,8 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
e);
throw e;
} finally {
+ // session will be closed and removed here
PipeDataNodeAgent.receiver().thrift().handleClientExit();
- SessionManager.getInstance()
- .closeSession(session,
Coordinator.getInstance()::cleanupQueryExecution);
socket.close();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 2734a808eb5..9f103de32d1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -404,6 +405,21 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return IoTDBDescriptor.getInstance().getConfig().getClusterId();
}
+ @Override
+ protected TSStatus tryLogin() {
+ final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
+ if (clientSession == null || !clientSession.isLogin()) {
+ return SESSION_MANAGER.login(
+ SESSION_MANAGER.getCurrSession(),
+ username,
+ password,
+ ZoneId.systemDefault().toString(),
+ SessionManager.CURRENT_RPC_VERSION,
+ IoTDBConstant.ClientVersion.V_1_0);
+ }
+ return StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
@Override
protected String getReceiverFileBaseDir() throws
DiskSpaceInsufficientException {
// Get next receiver file base dir by folder manager
@@ -696,4 +712,13 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
super.handleExit();
}
+
+ @Override
+ protected void closeSession() {
+ final IClientSession session = SESSION_MANAGER.getCurrSession();
+ if (session != null) {
+ SESSION_MANAGER.closeSession(session,
Coordinator.getInstance()::cleanupQueryExecution);
+ }
+ SESSION_MANAGER.removeCurrSession();
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 6584e344421..aaeb5cd1e46 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -240,6 +240,27 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
return new TPipeTransferResp(status);
}
+ final String usernameString =
+
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME);
+ if (usernameString != null) {
+ username = usernameString;
+ }
+ final String passwordString =
+
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD);
+ if (passwordString != null) {
+ password = passwordString;
+ }
+ final TSStatus status = tryLogin();
+ if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "Receiver id = {}: Handshake failed because login failed, response
status = {}.",
+ receiverId.get(),
+ status);
+ return new TPipeTransferResp(status);
+ } else {
+ LOGGER.info("Receiver id = {}: User {} login successfully.",
receiverId.get(), username);
+ }
+
final String shouldConvertDataTypeOnTypeMismatchString =
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH);
if (shouldConvertDataTypeOnTypeMismatchString != null) {
@@ -256,17 +277,6 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
loadTsFileStrategyString));
}
- final String usernameString =
-
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME);
- if (usernameString != null) {
- username = usernameString;
- }
- final String passwordString =
-
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD);
- if (passwordString != null) {
- password = passwordString;
- }
-
// Handle the handshake request as a v1 request.
// Here we construct a fake "dataNode" request to valid from v1 validation
logic, though
// it may not require the actual type of the v1 request.
@@ -281,6 +291,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
protected abstract String getClusterId();
+ protected abstract TSStatus tryLogin();
+
protected final TPipeTransferResp handleTransferFilePiece(
final PipeTransferFilePieceReq req,
final boolean isRequestThroughAirGap,
@@ -790,6 +802,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
}
+ // Close the session
+ closeSession();
+
LOGGER.info("Receiver id = {}: Handling exit: Receiver exited.",
receiverId.get());
}
+
+ protected abstract void closeSession();
}