This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 971605ddbcd [To dev/1.3] Pipe: Applied the login function to config 
receiver (#15258) & Pipe: Fixed the bug that config receiver cannot detect 
runtime password change (#15621) (#15647)
971605ddbcd is described below

commit 971605ddbcda74127f2460cfde0a4ad220c29f64
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 5 17:21:08 2025 +0800

    [To dev/1.3] Pipe: Applied the login function to config receiver (#15258) & 
Pipe: Fixed the bug that config receiver cannot detect runtime password change 
(#15621) (#15647)
    
    * Pipe: Applied the login function to config receiver (#15258)
    
    * Adjustment
---
 .../receiver/protocol/IoTDBConfigNodeReceiver.java | 12 ++--
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 78 ++++++++--------------
 .../commons/pipe/receiver/IoTDBFileReceiver.java   | 31 ++++++++-
 3 files changed, 63 insertions(+), 58 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 0986e50b31a..0fd091b146e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -461,13 +461,17 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
 
   @Override
   protected String getClusterId() {
-    return 
ConfigNode.getInstance().getConfigManager().getClusterManager().getClusterId();
+    return configManager.getClusterManager().getClusterId();
   }
 
   @Override
-  protected TSStatus tryLogin() {
-    // Do nothing. Login check will be done in the data node receiver.
-    return StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  protected boolean shouldLogin() {
+    return lastSuccessfulLoginTime == Long.MIN_VALUE || super.shouldLogin();
+  }
+
+  @Override
+  protected TSStatus login() {
+    return configManager.login(username, password).getStatus();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 975e8beb77d..7719a338fb6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -37,7 +37,6 @@ import 
org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
 import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.commons.utils.RetryUtils;
-import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -147,8 +146,6 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
   private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
 
-  private long lastSuccessfulLoginTime = Long.MIN_VALUE;
-
   private PipeMemoryBlock allocatedMemoryBlock;
 
   static {
@@ -415,29 +412,10 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   }
 
   @Override
-  protected TSStatus tryLogin() {
-    final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
-    final long loginPeriodicVerificationIntervalMs =
-        
PipeConfig.getInstance().getPipeReceiverLoginPeriodicVerificationIntervalMs();
-    if (clientSession == null
-        || !clientSession.isLogin()
-        || (loginPeriodicVerificationIntervalMs >= 0
-            && lastSuccessfulLoginTime
-                < System.currentTimeMillis() - 
loginPeriodicVerificationIntervalMs)) {
-      final TSStatus status =
-          SESSION_MANAGER.login(
-              SESSION_MANAGER.getCurrSession(),
-              username,
-              password,
-              ZoneId.systemDefault().toString(),
-              SessionManager.CURRENT_RPC_VERSION,
-              IoTDBConstant.ClientVersion.V_1_0);
-      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        lastSuccessfulLoginTime = System.currentTimeMillis();
-      }
-      return status;
-    }
-    return StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  protected boolean shouldLogin() {
+    // The idle time is updated per request
+    final IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    return clientSession == null || !clientSession.isLogin() || 
super.shouldLogin();
   }
 
   @Override
@@ -715,6 +693,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     }
 
     final TSStatus status = executeStatement(statement);
+
+    // Try to convert data type if the statement is a tree model statement
+    // and the status code is not success
     return shouldConvertDataTypeOnTypeMismatch
             && ((statement instanceof InsertBaseStatement
                     && ((InsertBaseStatement) 
statement).hasFailedMeasurements())
@@ -725,34 +706,14 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   }
 
   private TSStatus executeStatement(final Statement statement) {
-    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
-    final long loginPeriodicVerificationIntervalMs =
-        
PipeConfig.getInstance().getPipeReceiverLoginPeriodicVerificationIntervalMs();
-    if (clientSession == null
-        || !clientSession.isLogin()
-        || (loginPeriodicVerificationIntervalMs >= 0
-            && lastSuccessfulLoginTime
-                < System.currentTimeMillis() - 
loginPeriodicVerificationIntervalMs)) {
-      final BasicOpenSessionResp openSessionResp =
-          SESSION_MANAGER.login(
-              SESSION_MANAGER.getCurrSession(),
-              username,
-              password,
-              ZoneId.systemDefault().toString(),
-              SessionManager.CURRENT_RPC_VERSION,
-              IoTDBConstant.ClientVersion.V_1_0);
-      if (openSessionResp.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.warn(
-            "Receiver id = {}: Failed to open session, username = {}, response 
= {}.",
-            receiverId.get(),
-            username,
-            openSessionResp);
-        return RpcUtils.getStatus(openSessionResp.getCode(), 
openSessionResp.getMessage());
-      }
-      lastSuccessfulLoginTime = System.currentTimeMillis();
-      clientSession = SESSION_MANAGER.getCurrSession();
+    // Permission check
+    final TSStatus loginStatus = loginIfNecessary();
+    if (loginStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return loginStatus;
     }
 
+    final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
+
     final TSStatus status = AuthorityChecker.checkAuthority(statement, 
clientSession);
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.warn(
@@ -777,6 +738,19 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
         .status;
   }
 
+  @Override
+  protected TSStatus login() {
+    final BasicOpenSessionResp openSessionResp =
+        SESSION_MANAGER.login(
+            SESSION_MANAGER.getCurrSession(),
+            username,
+            password,
+            ZoneId.systemDefault().toString(),
+            SessionManager.CURRENT_RPC_VERSION,
+            IoTDBConstant.ClientVersion.V_1_0);
+    return RpcUtils.getStatus(openSessionResp.getCode(), 
openSessionResp.getMessage());
+  }
+
   @Override
   public synchronized void handleExit() {
     if (Objects.nonNull(configReceiverId.get())) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index c1f728ccfa6..1014b52d169 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -74,6 +74,10 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
   protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
 
+  private static final long LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS =
+      
PipeConfig.getInstance().getPipeReceiverLoginPeriodicVerificationIntervalMs();
+  protected long lastSuccessfulLoginTime = Long.MIN_VALUE;
+
   private File writingFile;
   private RandomAccessFile writingFileWriter;
 
@@ -260,7 +264,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     if (passwordString != null) {
       password = passwordString;
     }
-    final TSStatus status = tryLogin();
+    final TSStatus status = loginIfNecessary();
     if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.warn(
           "Receiver id = {}: Handshake failed because login failed, response 
status = {}.",
@@ -313,7 +317,30 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
 
   protected abstract String getClusterId();
 
-  protected abstract TSStatus tryLogin();
+  protected boolean shouldLogin() {
+    return LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS >= 0
+        && lastSuccessfulLoginTime
+            < System.currentTimeMillis() - 
LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS;
+  }
+
+  protected TSStatus loginIfNecessary() {
+    if (shouldLogin()) {
+      final TSStatus permissionCheckStatus = login();
+      if (permissionCheckStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOGGER.warn(
+            "Receiver id = {}: Failed to login, username = {}, response = {}.",
+            receiverId.get(),
+            username,
+            permissionCheckStatus);
+        return permissionCheckStatus;
+      } else {
+        lastSuccessfulLoginTime = System.currentTimeMillis();
+      }
+    }
+    return StatusUtils.OK;
+  }
+
+  protected abstract TSStatus login();
 
   protected final TPipeTransferResp handleTransferFilePiece(
       final PipeTransferFilePieceReq req,

Reply via email to