This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 43eb55ad927 Pipe: Introduce periodic verification for username and 
password on receiver side (#14764)
43eb55ad927 is described below

commit 43eb55ad92778c883fbc43962077f0b9b2b23bc8
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Feb 5 15:23:21 2025 +0800

    Pipe: Introduce periodic verification for username and password on receiver 
side (#14764)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 42 ++++++++++++++++------
 .../apache/iotdb/commons/conf/CommonConfig.java    | 12 +++++++
 .../iotdb/commons/conf/CommonDescriptor.java       |  6 ++++
 .../iotdb/commons/pipe/config/PipeConfig.java      | 10 ++++++
 4 files changed, 60 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 80cbac6a3ef..ec0274a38fc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferSliceReqHandler;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
@@ -175,6 +176,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   private static final Set<String> ALREADY_CREATED_DATABASES = 
ConcurrentHashMap.newKeySet();
 
   private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
+  private static final long LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS =
+      
PipeConfig.getInstance().getPipeReceiverLoginPeriodicVerificationIntervalMs();
+  private long lastSuccessfulLoginTime = Long.MIN_VALUE;
 
   static {
     try {
@@ -492,14 +496,23 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   @Override
   protected TSStatus tryLogin() {
     final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
-    if (clientSession == null || !clientSession.isLogin()) {
-      return SESSION_MANAGER.login(
-          SESSION_MANAGER.getCurrSession(),
-          username,
-          password,
-          ZoneId.systemDefault().toString(),
-          SessionManager.CURRENT_RPC_VERSION,
-          IoTDBConstant.ClientVersion.V_1_0);
+    if (clientSession == null
+        || !clientSession.isLogin()
+        || (LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS >= 0
+            && lastSuccessfulLoginTime
+                < System.currentTimeMillis() - 
LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS)) {
+      final TSStatus status =
+          SESSION_MANAGER.login(
+              SESSION_MANAGER.getCurrSession(),
+              username,
+              password,
+              ZoneId.systemDefault().toString(),
+              SessionManager.CURRENT_RPC_VERSION,
+              IoTDBConstant.ClientVersion.V_1_0);
+      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        lastSuccessfulLoginTime = System.currentTimeMillis();
+      }
+      return status;
     }
     return StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
   }
@@ -808,7 +821,11 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     // Permission check
     TSStatus permissionCheckStatus;
     IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
-    if (clientSession == null || !clientSession.isLogin()) {
+    if (clientSession == null
+        || !clientSession.isLogin()
+        || (LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS >= 0
+            && lastSuccessfulLoginTime
+                < System.currentTimeMillis() - 
LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS)) {
       permissionCheckStatus = login();
       if (permissionCheckStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return permissionCheckStatus;
@@ -887,6 +904,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           openSessionResp);
       return RpcUtils.getStatus(openSessionResp.getCode(), 
openSessionResp.getMessage());
     }
+    lastSuccessfulLoginTime = System.currentTimeMillis();
     return RpcUtils.SUCCESS_STATUS;
   }
 
@@ -989,7 +1007,11 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     try {
       // Permission check
       final IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
-      if (clientSession == null || !clientSession.isLogin()) {
+      if (clientSession == null
+          || !clientSession.isLogin()
+          || (LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS >= 0
+              && lastSuccessfulLoginTime
+                  < System.currentTimeMillis() - 
LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS)) {
         final TSStatus result = login();
         if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           return result;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 400b6822352..b2eeaf25f2c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -251,6 +251,8 @@ public class CommonConfig {
   private boolean pipeAirGapReceiverEnabled = false;
   private int pipeAirGapReceiverPort = 9780;
 
+  private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
+
   private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
   private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2;
   private int pipeMaxAllowedPinnedMemTableCount = 50;
@@ -997,6 +999,16 @@ public class CommonConfig {
     return pipeAirGapReceiverPort;
   }
 
+  public void setPipeReceiverLoginPeriodicVerificationIntervalMs(
+      long pipeReceiverLoginPeriodicVerificationIntervalMs) {
+    this.pipeReceiverLoginPeriodicVerificationIntervalMs =
+        pipeReceiverLoginPeriodicVerificationIntervalMs;
+  }
+
+  public long getPipeReceiverLoginPeriodicVerificationIntervalMs() {
+    return pipeReceiverLoginPeriodicVerificationIntervalMs;
+  }
+
   public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
     return pipeMaxAllowedHistoricalTsFilePerDataRegion;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index ff17258c13f..529d1053c42 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -479,6 +479,12 @@ public class CommonDescriptor {
                 "pipe_air_gap_receiver_port",
                 Integer.toString(config.getPipeAirGapReceiverPort()))));
 
+    config.setPipeReceiverLoginPeriodicVerificationIntervalMs(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_receiver_login_periodic_verification_interval_ms",
+                
Long.toString(config.getPipeReceiverLoginPeriodicVerificationIntervalMs()))));
+
     config.setPipeMaxAllowedHistoricalTsFilePerDataRegion(
         Integer.parseInt(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 4032bcc0af2..e652bff98f9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -224,6 +224,12 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeAirGapReceiverPort();
   }
 
+  /////////////////////////////// Receiver ///////////////////////////////
+
+  public long getPipeReceiverLoginPeriodicVerificationIntervalMs() {
+    return COMMON_CONFIG.getPipeReceiverLoginPeriodicVerificationIntervalMs();
+  }
+
   /////////////////////////////// Hybrid Mode ///////////////////////////////
 
   public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
@@ -434,6 +440,10 @@ public class PipeConfig {
     LOGGER.info("PipeAirGapReceiverEnabled: {}", 
getPipeAirGapReceiverEnabled());
     LOGGER.info("PipeAirGapReceiverPort: {}", getPipeAirGapReceiverPort());
 
+    LOGGER.info(
+        "PipeReceiverLoginPeriodicVerificationIntervalMs: {}",
+        getPipeReceiverLoginPeriodicVerificationIntervalMs());
+
     LOGGER.info(
         "PipeMaxAllowedHistoricalTsFilePerDataRegion: {}",
         getPipeMaxAllowedHistoricalTsFilePerDataRegion());

Reply via email to