This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 43eb55ad927 Pipe: Introduce periodic verification for username and
password on receiver side (#14764)
43eb55ad927 is described below
commit 43eb55ad92778c883fbc43962077f0b9b2b23bc8
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Feb 5 15:23:21 2025 +0800
Pipe: Introduce periodic verification for username and password on receiver
side (#14764)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../protocol/thrift/IoTDBDataNodeReceiver.java | 42 ++++++++++++++++------
.../apache/iotdb/commons/conf/CommonConfig.java | 12 +++++++
.../iotdb/commons/conf/CommonDescriptor.java | 6 ++++
.../iotdb/commons/pipe/config/PipeConfig.java | 10 ++++++
4 files changed, 60 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 80cbac6a3ef..ec0274a38fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferSliceReqHandler;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
@@ -175,6 +176,9 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private static final Set<String> ALREADY_CREATED_DATABASES =
ConcurrentHashMap.newKeySet();
private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
+ private static final long LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS =
+
PipeConfig.getInstance().getPipeReceiverLoginPeriodicVerificationIntervalMs();
+ private long lastSuccessfulLoginTime = Long.MIN_VALUE;
static {
try {
@@ -492,14 +496,23 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
@Override
protected TSStatus tryLogin() {
final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
- if (clientSession == null || !clientSession.isLogin()) {
- return SESSION_MANAGER.login(
- SESSION_MANAGER.getCurrSession(),
- username,
- password,
- ZoneId.systemDefault().toString(),
- SessionManager.CURRENT_RPC_VERSION,
- IoTDBConstant.ClientVersion.V_1_0);
+ if (clientSession == null
+ || !clientSession.isLogin()
+ || (LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS >= 0
+ && lastSuccessfulLoginTime
+ < System.currentTimeMillis() -
LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS)) {
+ final TSStatus status =
+ SESSION_MANAGER.login(
+ SESSION_MANAGER.getCurrSession(),
+ username,
+ password,
+ ZoneId.systemDefault().toString(),
+ SessionManager.CURRENT_RPC_VERSION,
+ IoTDBConstant.ClientVersion.V_1_0);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ lastSuccessfulLoginTime = System.currentTimeMillis();
+ }
+ return status;
}
return StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@@ -808,7 +821,11 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
// Permission check
TSStatus permissionCheckStatus;
IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
- if (clientSession == null || !clientSession.isLogin()) {
+ if (clientSession == null
+ || !clientSession.isLogin()
+ || (LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS >= 0
+ && lastSuccessfulLoginTime
+ < System.currentTimeMillis() -
LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS)) {
permissionCheckStatus = login();
if (permissionCheckStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return permissionCheckStatus;
@@ -887,6 +904,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
openSessionResp);
return RpcUtils.getStatus(openSessionResp.getCode(),
openSessionResp.getMessage());
}
+ lastSuccessfulLoginTime = System.currentTimeMillis();
return RpcUtils.SUCCESS_STATUS;
}
@@ -989,7 +1007,11 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
try {
// Permission check
final IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
- if (clientSession == null || !clientSession.isLogin()) {
+ if (clientSession == null
+ || !clientSession.isLogin()
+ || (LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS >= 0
+ && lastSuccessfulLoginTime
+ < System.currentTimeMillis() -
LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS)) {
final TSStatus result = login();
if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return result;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 400b6822352..b2eeaf25f2c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -251,6 +251,8 @@ public class CommonConfig {
private boolean pipeAirGapReceiverEnabled = false;
private int pipeAirGapReceiverPort = 9780;
+ private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
+
private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2;
private int pipeMaxAllowedPinnedMemTableCount = 50;
@@ -997,6 +999,16 @@ public class CommonConfig {
return pipeAirGapReceiverPort;
}
+ public void setPipeReceiverLoginPeriodicVerificationIntervalMs(
+ long pipeReceiverLoginPeriodicVerificationIntervalMs) {
+ this.pipeReceiverLoginPeriodicVerificationIntervalMs =
+ pipeReceiverLoginPeriodicVerificationIntervalMs;
+ }
+
+ public long getPipeReceiverLoginPeriodicVerificationIntervalMs() {
+ return pipeReceiverLoginPeriodicVerificationIntervalMs;
+ }
+
public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
return pipeMaxAllowedHistoricalTsFilePerDataRegion;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index ff17258c13f..529d1053c42 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -479,6 +479,12 @@ public class CommonDescriptor {
"pipe_air_gap_receiver_port",
Integer.toString(config.getPipeAirGapReceiverPort()))));
+ config.setPipeReceiverLoginPeriodicVerificationIntervalMs(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_receiver_login_periodic_verification_interval_ms",
+
Long.toString(config.getPipeReceiverLoginPeriodicVerificationIntervalMs()))));
+
config.setPipeMaxAllowedHistoricalTsFilePerDataRegion(
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 4032bcc0af2..e652bff98f9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -224,6 +224,12 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeAirGapReceiverPort();
}
+ /////////////////////////////// Receiver ///////////////////////////////
+
+ public long getPipeReceiverLoginPeriodicVerificationIntervalMs() {
+ return COMMON_CONFIG.getPipeReceiverLoginPeriodicVerificationIntervalMs();
+ }
+
/////////////////////////////// Hybrid Mode ///////////////////////////////
public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
@@ -434,6 +440,10 @@ public class PipeConfig {
LOGGER.info("PipeAirGapReceiverEnabled: {}",
getPipeAirGapReceiverEnabled());
LOGGER.info("PipeAirGapReceiverPort: {}", getPipeAirGapReceiverPort());
+ LOGGER.info(
+ "PipeReceiverLoginPeriodicVerificationIntervalMs: {}",
+ getPipeReceiverLoginPeriodicVerificationIntervalMs());
+
LOGGER.info(
"PipeMaxAllowedHistoricalTsFilePerDataRegion: {}",
getPipeMaxAllowedHistoricalTsFilePerDataRegion());