This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 7a01f418de5 Pipe: Add login logic to receiver handshake (#14401) 
(#14440)
7a01f418de5 is described below

commit 7a01f418de586a6bb60c52c443f6e502fbd57632
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Dec 16 14:21:44 2024 +0800

    Pipe: Add login logic to receiver handshake (#14401) (#14440)
    
    * Pipe: Add login logic to receiver handshake (#14401)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
    
    (cherry picked from commit edc318540ffff1a7940f919b4e2c49ce401c61d1)
---
 .../receiver/protocol/IoTDBConfigNodeReceiver.java | 12 +++++++
 .../protocol/airgap/IoTDBAirGapReceiver.java       |  7 ++--
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 25 ++++++++++++++
 .../commons/pipe/receiver/IoTDBFileReceiver.java   | 39 ++++++++++++++++------
 4 files changed, 67 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index fe8b383b56f..4293ecfd68a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
 import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
 import org.apache.iotdb.commons.schema.ttl.TTLCache;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
@@ -328,6 +329,12 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
     return 
ConfigNode.getInstance().getConfigManager().getClusterManager().getClusterId();
   }
 
+  @Override
+  protected TSStatus tryLogin() {
+    // Do nothing. Login check will be done in the data node receiver.
+    return StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
   @Override
   protected String getReceiverFileBaseDir() {
     return 
ConfigNodeDescriptor.getInstance().getConf().getPipeReceiverFileDir();
@@ -383,4 +390,9 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
     }
     return PipeReceiverStatusHandler.getPriorStatus(results);
   }
+
+  @Override
+  protected void closeSession() {
+    // Do nothing. The session will be closed in the data node receiver.
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 96b07b4a678..1d8424a1270 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
 import org.apache.iotdb.db.protocol.session.ClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
-import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -76,8 +75,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
 
     LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId, 
socket);
 
-    final ClientSession session = new ClientSession(socket);
-    SessionManager.getInstance().registerSession(session);
+    SessionManager.getInstance().registerSession(new ClientSession(socket));
 
     try {
       while (!socket.isClosed()) {
@@ -96,9 +94,8 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
           e);
       throw e;
     } finally {
+      // session will be closed and removed here
       PipeDataNodeAgent.receiver().thrift().handleClientExit();
-      SessionManager.getInstance()
-          .closeSession(session, 
Coordinator.getInstance()::cleanupQueryExecution);
       socket.close();
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 2734a808eb5..9f103de32d1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
 import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -404,6 +405,21 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     return IoTDBDescriptor.getInstance().getConfig().getClusterId();
   }
 
+  @Override
+  protected TSStatus tryLogin() {
+    final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
+    if (clientSession == null || !clientSession.isLogin()) {
+      return SESSION_MANAGER.login(
+          SESSION_MANAGER.getCurrSession(),
+          username,
+          password,
+          ZoneId.systemDefault().toString(),
+          SessionManager.CURRENT_RPC_VERSION,
+          IoTDBConstant.ClientVersion.V_1_0);
+    }
+    return StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
   @Override
   protected String getReceiverFileBaseDir() throws 
DiskSpaceInsufficientException {
     // Get next receiver file base dir by folder manager
@@ -696,4 +712,13 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
     super.handleExit();
   }
+
+  @Override
+  protected void closeSession() {
+    final IClientSession session = SESSION_MANAGER.getCurrSession();
+    if (session != null) {
+      SESSION_MANAGER.closeSession(session, 
Coordinator.getInstance()::cleanupQueryExecution);
+    }
+    SESSION_MANAGER.removeCurrSession();
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 6584e344421..aaeb5cd1e46 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -240,6 +240,27 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       return new TPipeTransferResp(status);
     }
 
+    final String usernameString =
+        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME);
+    if (usernameString != null) {
+      username = usernameString;
+    }
+    final String passwordString =
+        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD);
+    if (passwordString != null) {
+      password = passwordString;
+    }
+    final TSStatus status = tryLogin();
+    if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      LOGGER.warn(
+          "Receiver id = {}: Handshake failed because login failed, response 
status = {}.",
+          receiverId.get(),
+          status);
+      return new TPipeTransferResp(status);
+    } else {
+      LOGGER.info("Receiver id = {}: User {} login successfully.", 
receiverId.get(), username);
+    }
+
     final String shouldConvertDataTypeOnTypeMismatchString =
         
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH);
     if (shouldConvertDataTypeOnTypeMismatchString != null) {
@@ -256,17 +277,6 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
               loadTsFileStrategyString));
     }
 
-    final String usernameString =
-        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME);
-    if (usernameString != null) {
-      username = usernameString;
-    }
-    final String passwordString =
-        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD);
-    if (passwordString != null) {
-      password = passwordString;
-    }
-
     // Handle the handshake request as a v1 request.
     // Here we construct a fake "dataNode" request to valid from v1 validation 
logic, though
     // it may not require the actual type of the v1 request.
@@ -281,6 +291,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
 
   protected abstract String getClusterId();
 
+  protected abstract TSStatus tryLogin();
+
   protected final TPipeTransferResp handleTransferFilePiece(
       final PipeTransferFilePieceReq req,
       final boolean isRequestThroughAirGap,
@@ -790,6 +802,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       }
     }
 
+    // Close the session
+    closeSession();
+
     LOGGER.info("Receiver id = {}: Handling exit: Receiver exited.", 
receiverId.get());
   }
+
+  protected abstract void closeSession();
 }

Reply via email to