This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 0657aff5a93 [To dev/1.3] Pipe: Introduce periodic verification for 
username and password on receiver side (#14764) (#14790)
0657aff5a93 is described below

commit 0657aff5a93a20828a242a6c3228ce1c62323ac6
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Feb 5 17:17:08 2025 +0800

    [To dev/1.3] Pipe: Introduce periodic verification for username and 
password on receiver side (#14764) (#14790)
---
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 36 ++++++++++++++++------
 .../apache/iotdb/commons/conf/CommonConfig.java    | 12 ++++++++
 .../iotdb/commons/conf/CommonDescriptor.java       |  6 ++++
 .../iotdb/commons/pipe/config/PipeConfig.java      | 10 ++++++
 4 files changed, 55 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 6b6ea69de34..2817e8644ae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferSliceReqHandler;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
@@ -142,6 +143,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   private final PipeTransferSliceReqHandler sliceReqHandler = new 
PipeTransferSliceReqHandler();
 
   private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
+  private static final long LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS =
+      
PipeConfig.getInstance().getPipeReceiverLoginPeriodicVerificationIntervalMs();
+  private long lastSuccessfulLoginTime = Long.MIN_VALUE;
 
   static {
     try {
@@ -409,14 +413,23 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   @Override
   protected TSStatus tryLogin() {
     final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
-    if (clientSession == null || !clientSession.isLogin()) {
-      return SESSION_MANAGER.login(
-          SESSION_MANAGER.getCurrSession(),
-          username,
-          password,
-          ZoneId.systemDefault().toString(),
-          SessionManager.CURRENT_RPC_VERSION,
-          IoTDBConstant.ClientVersion.V_1_0);
+    if (clientSession == null
+        || !clientSession.isLogin()
+        || (LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS >= 0
+            && lastSuccessfulLoginTime
+                < System.currentTimeMillis() - 
LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS)) {
+      final TSStatus status =
+          SESSION_MANAGER.login(
+              SESSION_MANAGER.getCurrSession(),
+              username,
+              password,
+              ZoneId.systemDefault().toString(),
+              SessionManager.CURRENT_RPC_VERSION,
+              IoTDBConstant.ClientVersion.V_1_0);
+      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        lastSuccessfulLoginTime = System.currentTimeMillis();
+      }
+      return status;
     }
     return StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
   }
@@ -675,7 +688,11 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
   private TSStatus executeStatement(final Statement statement) {
     IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
-    if (clientSession == null || !clientSession.isLogin()) {
+    if (clientSession == null
+        || !clientSession.isLogin()
+        || (LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS >= 0
+            && lastSuccessfulLoginTime
+                < System.currentTimeMillis() - 
LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS)) {
       final BasicOpenSessionResp openSessionResp =
           SESSION_MANAGER.login(
               SESSION_MANAGER.getCurrSession(),
@@ -692,6 +709,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             openSessionResp);
         return RpcUtils.getStatus(openSessionResp.getCode(), 
openSessionResp.getMessage());
       }
+      lastSuccessfulLoginTime = System.currentTimeMillis();
       clientSession = SESSION_MANAGER.getCurrSession();
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index bb0fb08c1b8..50db0de3021 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -251,6 +251,8 @@ public class CommonConfig {
   private boolean pipeAirGapReceiverEnabled = false;
   private int pipeAirGapReceiverPort = 9780;
 
+  private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
+
   private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
   private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2;
   private int pipeMaxAllowedPinnedMemTableCount = 50;
@@ -992,6 +994,16 @@ public class CommonConfig {
     return pipeAirGapReceiverPort;
   }
 
+  public void setPipeReceiverLoginPeriodicVerificationIntervalMs(
+      long pipeReceiverLoginPeriodicVerificationIntervalMs) {
+    this.pipeReceiverLoginPeriodicVerificationIntervalMs =
+        pipeReceiverLoginPeriodicVerificationIntervalMs;
+  }
+
+  public long getPipeReceiverLoginPeriodicVerificationIntervalMs() {
+    return pipeReceiverLoginPeriodicVerificationIntervalMs;
+  }
+
   public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
     return pipeMaxAllowedHistoricalTsFilePerDataRegion;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index ee9516ee7c8..c40429567c2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -479,6 +479,12 @@ public class CommonDescriptor {
                 "pipe_air_gap_receiver_port",
                 Integer.toString(config.getPipeAirGapReceiverPort()))));
 
+    config.setPipeReceiverLoginPeriodicVerificationIntervalMs(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_receiver_login_periodic_verification_interval_ms",
+                
Long.toString(config.getPipeReceiverLoginPeriodicVerificationIntervalMs()))));
+
     config.setPipeMaxAllowedHistoricalTsFilePerDataRegion(
         Integer.parseInt(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 4032bcc0af2..e652bff98f9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -224,6 +224,12 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeAirGapReceiverPort();
   }
 
+  /////////////////////////////// Receiver ///////////////////////////////
+
+  public long getPipeReceiverLoginPeriodicVerificationIntervalMs() {
+    return COMMON_CONFIG.getPipeReceiverLoginPeriodicVerificationIntervalMs();
+  }
+
   /////////////////////////////// Hybrid Mode ///////////////////////////////
 
   public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
@@ -434,6 +440,10 @@ public class PipeConfig {
     LOGGER.info("PipeAirGapReceiverEnabled: {}", 
getPipeAirGapReceiverEnabled());
     LOGGER.info("PipeAirGapReceiverPort: {}", getPipeAirGapReceiverPort());
 
+    LOGGER.info(
+        "PipeReceiverLoginPeriodicVerificationIntervalMs: {}",
+        getPipeReceiverLoginPeriodicVerificationIntervalMs());
+
     LOGGER.info(
         "PipeMaxAllowedHistoricalTsFilePerDataRegion: {}",
         getPipeMaxAllowedHistoricalTsFilePerDataRegion());

Reply via email to