This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch air-gap-local
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/air-gap-local by this push:
new d3beee534aa some-part
d3beee534aa is described below
commit d3beee534aa597f5c5916dffb10cd604a77842d0
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 9 14:17:20 2026 +0800
some-part
---
.../protocol/airgap/IoTDBAirGapReceiver.java | 40 ++++++++++++++++++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 8 +++++
2 files changed, 48 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 3cea6c998f8..0a10db6f433 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -146,6 +146,14 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
receiverId,
resp.getStatus());
ok();
+ } else if (status.getCode()
+ ==
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
+ final long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime
+ < PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) {
+ agent.receive(req);
+
Thread.sleep(PipeConfig.getInstance().getPipeAirGapRetryLocalIntervalMs());
+ }
} else {
LOGGER.warn(
"Pipe air gap receiver {}: Handle data failed, status: {}, req:
{}",
@@ -171,6 +179,38 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
}
}
+ private void handleReq(final AirGapPseudoTPipeTransferRequest req) throws
IOException {
+ final TPipeTransferResp resp = agent.receive(req);
+
+ final TSStatus status = resp.getStatus();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ ok();
+ } else if (status.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+ || status.getCode()
+ ==
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
+ LOGGER.info(
+ "Pipe air gap receiver {}: TSStatus {} is encountered at the air gap
receiver, will ignore.",
+ receiverId,
+ resp.getStatus());
+ ok();
+ } else if (status.getCode()
+ ==
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
+ try {
+
Thread.sleep(PipeConfig.getInstance().getPipeAirGapRetryLocalIntervalMs());
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ handleReq(req);
+ } else {
+ LOGGER.warn(
+ "Pipe air gap receiver {}: Handle data failed, status: {}, req: {}",
+ receiverId,
+ resp.getStatus(),
+ req);
+ fail();
+ }
+ }
+
private void ok() throws IOException {
final OutputStream outputStream = socket.getOutputStream();
outputStream.write(AirGapOneByteResponse.OK);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 1ed1e39911f..12882cb83dd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -323,6 +323,14 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeAirGapReceiverPort();
}
+ public long getPipeAirGapRetryLocalIntervalMs() {
+ return COMMON_CONFIG.getPipeAirGapRetryLocalIntervalMs();
+ }
+
+ public long getPipeAirGapRetryMaxMs() {
+ return COMMON_CONFIG.getPipeAirGapRetryMaxMs();
+ }
+
/////////////////////////////// Receiver ///////////////////////////////
public long getPipeReceiverLoginPeriodicVerificationIntervalMs() {