This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch air-gap-local
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/air-gap-local by this push:
new 3122f197b65 mipl
3122f197b65 is described below
commit 3122f197b652bf16fe5093a9d17c261294d4de23
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 9 17:13:41 2026 +0800
mipl
---
.../protocol/airgap/IoTDBAirGapReceiver.java | 30 +---------------------
.../apache/iotdb/commons/conf/CommonConfig.java | 8 +++++-
2 files changed, 8 insertions(+), 30 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 49e642371c6..5d2e7013b34 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -133,35 +133,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
.setVersion(ReadWriteIOUtils.readByte(byteBuffer))
.setType(ReadWriteIOUtils.readShort(byteBuffer))
.setBody(byteBuffer.slice());
- final TPipeTransferResp resp = agent.receive(req);
-
- final TSStatus status = resp.getStatus();
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- ok();
- } else if (status.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
- || status.getCode()
- ==
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
- LOGGER.info(
- "Pipe air gap receiver {}: TSStatus {} is encountered at the air
gap receiver, will ignore.",
- receiverId,
- resp.getStatus());
- ok();
- } else if (status.getCode()
- ==
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
- final long startTime = System.currentTimeMillis();
- while (System.currentTimeMillis() - startTime
- < PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) {
- agent.receive(req);
-
Thread.sleep(PipeConfig.getInstance().getPipeAirGapRetryLocalIntervalMs());
- }
- } else {
- LOGGER.warn(
- "Pipe air gap receiver {}: Handle data failed, status: {}, req:
{}",
- receiverId,
- resp.getStatus(),
- req);
- fail();
- }
+ handleReq(req, System.currentTimeMillis());
} catch (final PipeConnectionException e) {
LOGGER.info(
"Pipe air gap receiver {}: Socket {} closed when listening to data.
Because: {}",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index b5cf9cd1a3c..cf68da89553 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProper
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
import org.apache.iotdb.commons.enums.PipeRateAverage;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.rpc.RpcUtils;
@@ -1630,8 +1631,13 @@ public class CommonConfig {
logger.info("pipeAirGapRetryLocalIntervalMs is set to {}.",
pipeAirGapRetryLocalIntervalMs);
}
+ // < 0 : 0.8 * transfer timeout to avoid timeout
+ // = 0 : Disable retry
+ // > 0 : Explicit configuration
public long getPipeAirGapRetryMaxMs() {
- return pipeAirGapRetryMaxMs;
+ return pipeAirGapRetryMaxMs >= 0
+ ? pipeAirGapRetryMaxMs
+ : (long) (PipeConfig.getInstance().getPipeSinkTransferTimeoutMs() *
0.8);
}
public void setPipeAirGapRetryMaxMs(long pipeAirGapRetryMaxMs) {